You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2022/02/18 09:42:27 UTC
[ignite] branch sql-calcite updated: IGNITE-16151 Introduce query planner timeout. (#9821)
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 023bee6 IGNITE-16151 Introduce query planner timeout. (#9821)
023bee6 is described below
commit 023bee6c36ba031e73f55a7f37399c8ff9eaa7f8
Author: Ivan Daschinskiy <iv...@apache.org>
AuthorDate: Fri Feb 18 12:42:08 2022 +0300
IGNITE-16151 Introduce query planner timeout. (#9821)
---
.../query/calcite/CalciteQueryProcessor.java | 25 ++++-
.../processors/query/calcite/RootQuery.java | 11 +-
.../query/calcite/prepare/BaseQueryContext.java | 2 +-
.../query/calcite/prepare/IgnitePlanner.java | 17 +++
.../query/calcite/prepare/PlanningContext.java | 43 +++++++-
.../integration/AbstractBasicIntegrationTest.java | 75 ++++++++++++++
.../integration/IndexScanlIntegrationTest.java | 104 +++++--------------
.../integration/RunningQueriesIntegrationTest.java | 48 +++++++++
.../query/calcite/planner/AbstractPlannerTest.java | 40 ++++----
.../query/calcite/planner/PlannerTest.java | 16 +--
.../query/calcite/planner/PlannerTimeoutTest.java | 114 +++++++++++++++++++++
.../apache/ignite/testsuites/PlannerTestSuite.java | 2 +
.../ignite/startup/cmdline/CommandLineStartup.java | 4 +
13 files changed, 385 insertions(+), 116 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 33b95d6..3429bad 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -40,6 +40,7 @@ import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -88,8 +89,22 @@ import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.getLong;
+
/** */
public class CalciteQueryProcessor extends GridProcessorAdapter implements QueryEngine {
+ /**
+ * Default planner timeout, in ms.
+ */
+ private static final long DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT = 15000;
+
+ /**
+ * Planner timeout property name.
+ */
+ @SystemProperty(value = "Timeout of calcite based sql engine's planner, in ms", type = Long.class,
+ defaults = "" + DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT)
+ public static final String IGNITE_CALCITE_PLANNER_TIMEOUT = "IGNITE_CALCITE_PLANNER_TIMEOUT";
+
/** */
public static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder()
.executor(new RexExecutorImpl(DataContexts.EMPTY))
@@ -135,6 +150,10 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
})
.build();
+ /** Query planner timeout. */
+ private final long queryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT,
+ DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);
+
/** */
private final QueryPlanCache qryPlanCache;
@@ -312,7 +331,8 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
qryCtx,
exchangeSvc,
(q) -> qryReg.unregister(q.id()),
- log
+ log,
+ queryPlannerTimeout
);
qryReg.register(qry);
@@ -351,7 +371,8 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
qryCtx,
exchangeSvc,
(q) -> qryReg.unregister(q.id()),
- log
+ log,
+ queryPlannerTimeout
);
qrys.add(qry);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 7cca5e1..868ebbc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -83,6 +83,9 @@ public class RootQuery<RowT> extends Query<RowT> {
private final BaseQueryContext ctx;
/** */
+ private final long plannerTimeout;
+
+ /** */
public RootQuery(
String sql,
SchemaPlus schema,
@@ -90,7 +93,8 @@ public class RootQuery<RowT> extends Query<RowT> {
QueryContext qryCtx,
ExchangeService exch,
Consumer<Query<RowT>> unregister,
- IgniteLogger log
+ IgniteLogger log,
+ long plannerTimeout
) {
super(
UUID.randomUUID(),
@@ -108,6 +112,8 @@ public class RootQuery<RowT> extends Query<RowT> {
remoteFragments = new HashMap<>();
waiting = new HashSet<>();
+ this.plannerTimeout = plannerTimeout;
+
Context parent = Commons.convert(qryCtx);
ctx = BaseQueryContext.builder()
@@ -130,7 +136,7 @@ public class RootQuery<RowT> extends Query<RowT> {
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
- return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log);
+ return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), exch, unregister, log, plannerTimeout);
}
/** */
@@ -291,6 +297,7 @@ public class RootQuery<RowT> extends Query<RowT> {
.parentContext(ctx)
.query(sql)
.parameters(params)
+ .plannerTimeout(plannerTimeout)
.build();
try {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
index 076701f..f45e2a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/BaseQueryContext.java
@@ -266,7 +266,7 @@ public final class BaseQueryContext extends AbstractQueryContext {
/**
* Query context builder.
*/
- @SuppressWarnings("PublicInnerClass")
+ @SuppressWarnings("PublicInnerClass")
public static class Builder {
/** */
private static final FrameworkConfig EMPTY_CONFIG =
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 84a3245..db7d66e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetada
import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Query planer.
@@ -518,5 +519,21 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
@Override public RelOptCost getCost(RelNode rel, RelMetadataQuery mq) {
return mq.getCumulativeCost(rel);
}
+
+ /** {@inheritDoc} */
+ @Override public void checkCancel() {
+ PlanningContext ctx = getContext().unwrap(PlanningContext.class);
+
+ long timeout = ctx.plannerTimeout();
+
+ if (timeout > 0) {
+ long startTs = ctx.startTs();
+
+ if (U.currentTimeMillis() - startTs > timeout)
+ cancelFlag.set(true);
+ }
+
+ super.checkCancel();
+ }
}
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
index 3754ce7..1bb765a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanningContext.java
@@ -31,6 +31,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.util.CancelFlag;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
/**
@@ -55,18 +56,27 @@ public final class PlanningContext implements Context {
/** */
private IgnitePlanner planner;
+ /** */
+ private final long startTs;
+
+ /** */
+ private final long plannerTimeout;
+
/**
* Private constructor, used by a builder.
*/
private PlanningContext(
Context parentCtx,
String qry,
- Object[] parameters
+ Object[] parameters,
+ long plannerTimeout
) {
this.qry = qry;
this.parameters = parameters;
this.parentCtx = parentCtx;
+ startTs = U.currentTimeMillis();
+ this.plannerTimeout = plannerTimeout;
}
/**
@@ -100,6 +110,20 @@ public final class PlanningContext implements Context {
}
/**
+ * @return Start timestamp in millis.
+ */
+ public long startTs() {
+ return startTs;
+ }
+
+ /**
+ * @return Start timestamp in millis.
+ */
+ public long plannerTimeout() {
+ return plannerTimeout;
+ }
+
+ /**
* @return Schema.
*/
public SchemaPlus schema() {
@@ -189,7 +213,7 @@ public final class PlanningContext implements Context {
/**
* Planner context builder.
*/
- @SuppressWarnings("PublicInnerClass")
+ @SuppressWarnings("PublicInnerClass")
public static class Builder {
/** */
private Context parentCtx = Contexts.empty();
@@ -200,6 +224,9 @@ public final class PlanningContext implements Context {
/** */
private Object[] parameters;
+ /** */
+ private long plannerTimeout;
+
/**
* @param parentCtx Parent context.
* @return Builder for chaining.
@@ -229,12 +256,22 @@ public final class PlanningContext implements Context {
}
/**
+ * @param plannerTimeout Planner timeout.
+ *
+ * @return Builder for chaining.
+ */
+ public Builder plannerTimeout(long plannerTimeout) {
+ this.plannerTimeout = plannerTimeout;
+ return this;
+ }
+
+ /**
* Builds planner context.
*
* @return Planner context.
*/
public PlanningContext build() {
- return new PlanningContext(parentCtx, qry, parameters);
+ return new PlanningContext(parentCtx, qry, parameters, plannerTimeout);
}
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 782a05c..d55b463 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -18,6 +18,14 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
import java.util.List;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
@@ -29,11 +37,18 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
+import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -165,6 +180,66 @@ public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
}
/** */
+ public static class DelegatingIgniteIndex implements IgniteIndex {
+ /** */
+ protected final IgniteIndex delegate;
+
+ /** */
+ public DelegatingIgniteIndex(IgniteIndex delegate) {
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelCollation collation() {
+ return delegate.collation();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return delegate.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTable table() {
+ return delegate.table();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogicalIndexScan toRel(
+ RelOptCluster cluster,
+ RelOptTable relOptTbl,
+ @Nullable List<RexNode> proj,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return delegate.toRel(cluster, relOptTbl, proj, cond, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IndexConditions toIndexCondition(
+ RelOptCluster cluster,
+ @Nullable RexNode cond,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return delegate.toIndexCondition(cluster, cond, requiredColumns);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp,
+ Predicate<Row> filters,
+ Supplier<Row> lowerIdxConditions,
+ Supplier<Row> upperIdxConditions,
+ Function<Row, Row> rowTransformer,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ return delegate.scan(execCtx, grp, filters, lowerIdxConditions, upperIdxConditions, rowTransformer,
+ requiredColumns);
+ }
+ }
+
+ /** */
public static class Employer {
/** */
@QuerySqlField
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index 97056f3..dc9e1e1 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -17,22 +17,15 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
-import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
-import org.apache.ignite.internal.processors.query.calcite.rel.logical.IgniteLogicalIndexScan;
-import org.apache.ignite.internal.processors.query.calcite.schema.IgniteIndex;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
-import org.apache.ignite.internal.processors.query.calcite.util.IndexConditions;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
@@ -53,10 +46,30 @@ public class IndexScanlIntegrationTest extends AbstractBasicIntegrationTest {
executeSql("CREATE INDEX t_idx ON t(i1)");
IgniteTable tbl = (IgniteTable)queryProcessor(grid(0)).schemaHolder().schema("PUBLIC").getTable("T");
- IgniteIndex idxOld = tbl.getIndex("T_IDX");
- TestIgniteIndex idxNew = new TestIgniteIndex(idxOld);
- tbl.addIndex(idxNew);
+ AtomicInteger filteredRows = new AtomicInteger();
+ tbl.addIndex(new DelegatingIgniteIndex(tbl.getIndex("T_IDX")) {
+ @Override public <Row> Iterable<Row> scan(
+ ExecutionContext<Row> execCtx,
+ ColocationGroup grp,
+ Predicate<Row> filters,
+ Supplier<Row> lowerIdxConditions,
+ Supplier<Row> upperIdxConditions,
+ Function<Row, Row> rowTransformer,
+ @Nullable ImmutableBitSet requiredColumns
+ ) {
+ Predicate<Row> filter = row -> {
+ filteredRows.incrementAndGet();
+
+ return true;
+ };
+
+ filters = filter.and(filters);
+
+ return delegate.scan(execCtx, grp, filters, lowerIdxConditions, upperIdxConditions, rowTransformer,
+ requiredColumns);
+ }
+ });
String sql = "SELECT /*+ DISABLE_RULE('NestedLoopJoinConverter', 'MergeJoinConverter') */ t1.i1, t2.i1 " +
"FROM t t1 " +
@@ -74,73 +87,6 @@ public class IndexScanlIntegrationTest extends AbstractBasicIntegrationTest {
// There shouldn't be full index scan in case of null values in search row, only one value must be found by
// range scan and passed to predicate.
- assertEquals(1, idxNew.filteredRows);
- }
-
- /** */
- private static class TestIgniteIndex implements IgniteIndex {
- /** */
- private final IgniteIndex delegate;
-
- /** */
- private int filteredRows;
-
- /** */
- public TestIgniteIndex(IgniteIndex delegate) {
- this.delegate = delegate;
- }
-
- /** {@inheritDoc} */
- @Override public RelCollation collation() {
- return delegate.collation();
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return delegate.name();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteTable table() {
- return delegate.table();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogicalIndexScan toRel(
- RelOptCluster cluster,
- RelOptTable relOptTbl,
- @Nullable List<RexNode> proj,
- @Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- return delegate.toRel(cluster, relOptTbl, proj, cond, requiredColumns);
- }
-
- /** {@inheritDoc} */
- @Override public IndexConditions toIndexCondition(
- RelOptCluster cluster,
- @Nullable RexNode cond,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- return delegate.toIndexCondition(cluster, cond, requiredColumns);
- }
-
- /** {@inheritDoc} */
- @Override public <Row> Iterable<Row> scan(
- ExecutionContext<Row> execCtx,
- ColocationGroup grp,
- Predicate<Row> filters,
- Supplier<Row> lowerIdxConditions,
- Supplier<Row> upperIdxConditions,
- Function<Row, Row> rowTransformer,
- @Nullable ImmutableBitSet requiredColumns
- ) {
- Predicate<Row> filter = row -> { filteredRows++; return true; };
-
- filters = filter.and(filters);
-
- return delegate.scan(execCtx, grp, filters, lowerIdxConditions, upperIdxConditions, rowTransformer,
- requiredColumns);
- }
+ assertEquals(1, filteredRows.get());
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index 9dfe622..f7b83b6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -25,9 +25,13 @@ import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteEx;
@@ -35,26 +39,35 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Test;
import static java.util.stream.Collectors.joining;
+import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT;
/**
*
*/
+@WithSystemProperty(key = IGNITE_CALCITE_PLANNER_TIMEOUT, value = "2000")
public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest {
/** */
+ private static final long PLANNER_TIMEOUT = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT, 0);
+
+ /** */
private static IgniteEx srv;
/** Timeout in ms for async operations. */
@@ -234,4 +247,39 @@ public class RunningQueriesIntegrationTest extends AbstractBasicIntegrationTest
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IgniteSQLException.class, "The query was cancelled while executing.");
}
+
+ /** */
+ @Test
+ public void testLongPlanningTimeout() {
+ Stream.of("T1", "T2").forEach(tblName -> {
+ sql(String.format("CREATE TABLE %s(A INT, B INT)", tblName));
+
+ IgniteTable tbl = (IgniteTable)queryProcessor(client).schemaHolder().schema("PUBLIC").getTable(tblName);
+
+ tbl.addIndex(new DelegatingIgniteIndex(tbl.getIndex(QueryUtils.PRIMARY_KEY_INDEX)) {
+ @Override public RelCollation collation() {
+ doSleep(300);
+
+ return delegate.collation();
+ }
+ });
+
+ sql(String.format("INSERT INTO %s(A, B) VALUES (1, 1)", tblName));
+ });
+
+ String longJoinQry = "SELECT * FROM T1 JOIN T2 ON T1.A = T2.A";
+
+ try {
+ AtomicReference<List<List<?>>> res = new AtomicReference<>();
+ GridTestUtils.assertTimeout(3 * PLANNER_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
+ res.set(sql(longJoinQry));
+ });
+
+ assertNotNull(res.get());
+ assertFalse(res.get().isEmpty());
+ }
+ catch (Exception e) {
+ assertTrue("Unexpected exception: " + e, e instanceof RelOptPlanner.CannotPlanException);
+ }
+ }
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
index 62b1fb0..53263d6 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/AbstractPlannerTest.java
@@ -235,41 +235,39 @@ public abstract class AbstractPlannerTest extends GridCommonAbstractTest {
/** */
protected IgniteRel physicalPlan(String sql, IgniteSchema publicSchema, String... disabledRules) throws Exception {
- return physicalPlan(sql, plannerCtx(sql, publicSchema, disabledRules));
+ return physicalPlan(plannerCtx(sql, publicSchema, disabledRules));
}
/** */
protected IgniteRel physicalPlan(String sql, Collection<IgniteSchema> schemas, String... disabledRules) throws Exception {
- return physicalPlan(sql, plannerCtx(sql, schemas, disabledRules));
+ return physicalPlan(plannerCtx(sql, schemas, disabledRules));
}
/** */
- protected IgniteRel physicalPlan(String sql, PlanningContext ctx) throws Exception {
+ protected IgniteRel physicalPlan(PlanningContext ctx) throws Exception {
try (IgnitePlanner planner = ctx.planner()) {
assertNotNull(planner);
+ assertNotNull(ctx.query());
- String qry = ctx.query();
-
- assertNotNull(qry);
-
- // Parse
- SqlNode sqlNode = planner.parse(qry);
-
- // Validate
- sqlNode = planner.validate(sqlNode);
+ return physicalPlan(planner, ctx.query());
+ }
+ }
- try {
- IgniteRel rel = PlannerHelper.optimize(sqlNode, planner, log);
+ /** */
+ protected IgniteRel physicalPlan(IgnitePlanner planner, String qry) throws Exception {
+ // Parse
+ SqlNode sqlNode = planner.parse(qry);
-// System.out.println(RelOptUtil.toString(rel));
+ // Validate
+ sqlNode = planner.validate(sqlNode);
- return rel;
- }
- catch (Throwable ex) {
- System.err.println(planner.dump());
+ try {
+ return PlannerHelper.optimize(sqlNode, planner, log);
+ }
+ catch (Throwable ex) {
+ System.err.println(planner.dump());
- throw ex;
- }
+ throw ex;
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index 7a269c4..229c99c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -162,7 +162,7 @@ public class PlannerTest extends AbstractPlannerTest {
assertNotNull(ctx);
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -270,7 +270,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(-10)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -497,7 +497,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(-10)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -716,7 +716,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(2)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -799,7 +799,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(2)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -879,7 +879,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(2)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -961,7 +961,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(2)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
@@ -1037,7 +1037,7 @@ public class PlannerTest extends AbstractPlannerTest {
.parameters(2)
.build();
- IgniteRel phys = physicalPlan(sql, ctx);
+ IgniteRel phys = physicalPlan(ctx);
assertNotNull(phys);
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTimeoutTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTimeoutTest.java
new file mode 100644
index 0000000..408926b
--- /dev/null
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTimeoutTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query.calcite.planner;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.plan.volcano.VolcanoTimeoutException;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.schema.CacheIndexImpl;
+import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Test planner timeout.
+ */
+public class PlannerTimeoutTest extends AbstractPlannerTest {
+ /** */
+ private static final long PLANNER_TIMEOUT = 1_000;
+
+ /** */
+ @Test
+ public void testLongPlanningTimeout() throws Exception {
+ IgniteSchema schema = createSchema(
+ createTestTable("T1", "A", Integer.class, "B", Integer.class),
+ createTestTable("T2", "A", Integer.class, "B", Integer.class)
+ );
+
+ String sql = "SELECT * FROM T1 JOIN T2 ON T1.A = T2.A";
+
+ PlanningContext ctx = PlanningContext.builder()
+ .parentContext(baseQueryContext(Collections.singletonList(schema)))
+ .plannerTimeout(PLANNER_TIMEOUT)
+ .query(sql)
+ .build();
+
+ AtomicReference<IgniteRel> plan = new AtomicReference<>();
+ AtomicReference<RelOptPlanner.CannotPlanException> plannerError = new AtomicReference<>();
+
+ GridTestUtils.assertTimeout(3 * PLANNER_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
+ try (IgnitePlanner planner = ctx.planner()) {
+ plan.set(physicalPlan(planner, ctx.query()));
+
+ VolcanoPlanner volcanoPlanner = (VolcanoPlanner)ctx.cluster().getPlanner();
+
+ assertNotNull(volcanoPlanner);
+
+ GridTestUtils.assertThrowsWithCause(volcanoPlanner::checkCancel, VolcanoTimeoutException.class);
+ }
+ catch (RelOptPlanner.CannotPlanException e) {
+ plannerError.set(e);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Planning failed", e);
+ }
+ });
+
+ assertTrue(plan.get() != null || plannerError.get() != null);
+
+ if (plan.get() != null) {
+ new RelVisitor() {
+ @Override public void visit(
+ RelNode node,
+ int ordinal,
+ RelNode parent
+ ) {
+ assertNotNull(node.getTraitSet().getTrait(IgniteConvention.INSTANCE.getTraitDef()));
+ super.visit(node, ordinal, parent);
+ }
+ }.go(plan.get());
+ }
+ }
+
+ /** */
+ private static TestTable createTestTable(String name, Object... cols) {
+ TestTable table = createTable(name, IgniteDistributions.broadcast(), cols);
+
+ RelCollation pkColl = TraitUtils.createCollation(Collections.singletonList(0));
+ table.addIndex(new CacheIndexImpl(pkColl, "pk", null, table) {
+ @Override public RelCollation collation() {
+ doSleep(300);
+
+ return super.collation();
+ }
+ });
+
+ return table;
+ }
+}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
index 524ac14..d4f75e0 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/PlannerTestSuite.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.query.calcite.planner.JoinWithUsing
import org.apache.ignite.internal.processors.query.calcite.planner.LimitOffsetPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.MergeJoinPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTest;
+import org.apache.ignite.internal.processors.query.calcite.planner.PlannerTimeoutTest;
import org.apache.ignite.internal.processors.query.calcite.planner.ProjectFilterScanMergePlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SetOpPlannerTest;
import org.apache.ignite.internal.processors.query.calcite.planner.SortAggregatePlannerTest;
@@ -69,6 +70,7 @@ import org.junit.runners.Suite;
JoinWithUsingPlannerTest.class,
ProjectFilterScanMergePlannerTest.class,
IndexRebuildPlannerTest.class,
+ PlannerTimeoutTest.class,
})
public class PlannerTestSuite {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
index 300cc82..b53c118 100644
--- a/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
+++ b/modules/core/src/main/java/org/apache/ignite/startup/cmdline/CommandLineStartup.java
@@ -108,6 +108,7 @@ public final class CommandLineStartup {
String h2TreeCls = "org.apache.ignite.internal.processors.query.h2.database.H2Tree";
String zkDiscoImpl = "org.apache.ignite.spi.discovery.zk.internal.ZookeeperDiscoveryImpl";
String zkTcpDiscoIpFinder = "org.apache.ignite.spi.discovery.tcp.ipfinder.zk.TcpDiscoveryZookeeperIpFinder";
+ String calciteQryProc = "org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor";
try {
if (U.inClassPath(h2TreeCls))
@@ -117,6 +118,9 @@ public final class CommandLineStartup {
PROPS_CLS.add(Class.forName(zkDiscoImpl));
PROPS_CLS.add(Class.forName(zkTcpDiscoIpFinder));
}
+
+ if (U.inClassPath(calciteQryProc))
+ PROPS_CLS.add(Class.forName(calciteQryProc));
}
catch (ClassNotFoundException ignored) {
// No-op.