You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/19 06:28:56 UTC
[druid] branch master updated: Convert the Druid planner to use statement handlers (#12905)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8ce03eb094 Convert the Druid planner to use statement handlers (#12905)
8ce03eb094 is described below
commit 8ce03eb094d6723b831ac03a067e2c4e3cf499fb
Author: Paul Rogers <pa...@users.noreply.github.com>
AuthorDate: Mon Sep 19 08:28:45 2022 +0200
Convert the Druid planner to use statement handlers (#12905)
* Converted Druid planner to use statement handlers
Converts the large collection of if-statements for statement
types into a set of classes: one per supported statement type.
Cleans up a few error messages.
* Revisions from review comments
* Build fix
* Build fix
* Resolve merge confict.
* More merges with QueryResponse PR
* More parameterized type cleanup
Forces a rebuild due to a flaky test
---
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 2 +-
.../org/apache/druid/msq/sql/SqlTaskResource.java | 4 +-
.../org/apache/druid/msq/exec/MSQInsertTest.java | 2 +-
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 2 +-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 8 +-
.../org/apache/druid/server/QueryLifecycle.java | 18 +-
.../org/apache/druid/server/QueryResource.java | 6 +-
.../org/apache/druid/server/QueryResponse.java | 18 +-
sql/src/main/codegen/includes/insert.ftl | 2 +-
sql/src/main/codegen/includes/replace.ftl | 2 +-
.../java/org/apache/druid/sql/DirectStatement.java | 4 +-
.../druid/sql/calcite/parser/DruidSqlIngest.java | 72 ++
.../druid/sql/calcite/parser/DruidSqlInsert.java | 40 +-
.../druid/sql/calcite/parser/DruidSqlReplace.java | 40 +-
.../druid/sql/calcite/planner/DruidPlanner.java | 904 ++-------------------
.../druid/sql/calcite/planner/IngestHandler.java | 346 ++++++++
.../druid/sql/calcite/planner/PlannerResult.java | 6 +-
.../druid/sql/calcite/planner/QueryHandler.java | 675 +++++++++++++++
.../sql/calcite/planner/SqlStatementHandler.java | 76 ++
.../apache/druid/sql/calcite/rel/DruidQuery.java | 6 +-
.../org/apache/druid/sql/calcite/rel/DruidRel.java | 4 +-
.../druid/sql/calcite/rel/DruidUnionRel.java | 14 +-
.../druid/sql/calcite/run/NativeQueryMaker.java | 22 +-
.../apache/druid/sql/calcite/run/QueryMaker.java | 2 +-
.../org/apache/druid/sql/http/SqlResource.java | 2 +-
.../druid/sql/calcite/CalciteInsertDmlTest.java | 24 +-
.../druid/sql/calcite/CalciteReplaceDmlTest.java | 16 +-
.../druid/sql/calcite/TestInsertQueryMaker.java | 2 +-
.../org/apache/druid/sql/http/SqlResourceTest.java | 10 +-
29 files changed, 1334 insertions(+), 995 deletions(-)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index e2754cacbe..c1611f52db 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -105,7 +105,7 @@ public class MSQTaskQueryMaker implements QueryMaker
}
@Override
- public QueryResponse runQuery(final DruidQuery druidQuery)
+ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
String taskId = MSQTasks.controllerTaskId(plannerContext.getSqlQueryId());
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
index 1725060806..f0cd7318f6 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/SqlTaskResource.java
@@ -147,8 +147,8 @@ public class SqlTaskResource
final String sqlQueryId = stmt.sqlQueryId();
try {
final DirectStatement.ResultSet plan = stmt.plan();
- final QueryResponse response = plan.run();
- final Sequence sequence = response.getResults();
+ final QueryResponse<Object[]> response = plan.run();
+ final Sequence<Object[]> sequence = response.getResults();
final SqlRowTransformer rowTransformer = plan.createRowTransformer();
final boolean isTaskStruct = MSQTaskSqlEngine.TASK_STRUCT_FIELD_NAMES.equals(rowTransformer.getFieldList());
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index a1934f070a..18ad9f050d 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -439,7 +439,7 @@ public class MSQInsertTest extends MSQTestBase
.setExpectedValidationErrorMatcher(CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
- "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"))
+ "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"))
))
.verifyPlanningErrors();
}
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 131a9fa91b..ca7d5d3203 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -187,7 +187,7 @@ public class MSQReplaceTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
- "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL."))
+ "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table."))
)
)
.verifyPlanningErrors();
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index 10c67e27a2..12b353268a 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -697,7 +697,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
- "Cannot query table [INFORMATION_SCHEMA.SCHEMATA] with SQL engine 'msq-task'."))
+ "Cannot query table INFORMATION_SCHEMA.SCHEMATA with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@@ -712,7 +712,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
- "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+ "Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@@ -727,7 +727,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
- "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+ "Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
@@ -743,7 +743,7 @@ public class MSQSelectTest extends MSQTestBase
CoreMatchers.allOf(
CoreMatchers.instanceOf(SqlPlanningException.class),
ThrowableMessageMatcher.hasMessage(CoreMatchers.startsWith(
- "Cannot query table [sys.segments] with SQL engine 'msq-task'."))
+ "Cannot query table sys.segments with SQL engine 'msq-task'."))
)
)
.verifyPlanningErrors();
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index b4d80d01d9..40e5267b80 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -128,7 +128,6 @@ public class QueryLifecycle
this.startNs = startNs;
}
-
/**
* For callers who have already authorized their query, and where simplicity is desired over flexibility. This method
* does it all in one call. Logs and metrics are emitted when the Sequence is either fully iterated or throws an
@@ -140,8 +139,7 @@ public class QueryLifecycle
*
* @return results
*/
- @SuppressWarnings("unchecked")
- public <T> QueryResponse runSimple(
+ public <T> QueryResponse<T> runSimple(
final Query<T> query,
final AuthenticationResult authenticationResult,
final Access authorizationResult
@@ -151,7 +149,7 @@ public class QueryLifecycle
final Sequence<T> results;
- final QueryResponse queryResponse;
+ final QueryResponse<T> queryResponse;
try {
preAuthorized(authenticationResult, authorizationResult);
if (!authorizationResult.isAllowed()) {
@@ -172,7 +170,7 @@ public class QueryLifecycle
* cannot be moved into execute(). We leave this as an exercise for the future, however as this oddity
* was discovered while just trying to expose HTTP response headers
*/
- return new QueryResponse(
+ return new QueryResponse<T>(
Sequences.wrap(
results,
new SequenceWrapper()
@@ -193,8 +191,7 @@ public class QueryLifecycle
*
* @param baseQuery the query
*/
- @SuppressWarnings("unchecked")
- public void initialize(final Query baseQuery)
+ public void initialize(final Query<?> baseQuery)
{
transition(State.NEW, State.INITIALIZED);
@@ -282,17 +279,18 @@ public class QueryLifecycle
*
* @return result sequence and response context
*/
- public QueryResponse execute()
+ public <T> QueryResponse<T> execute()
{
transition(State.AUTHORIZED, State.EXECUTING);
final ResponseContext responseContext = DirectDruidClient.makeResponseContextForQuery();
- final Sequence<?> res = QueryPlus.wrap(baseQuery)
+ @SuppressWarnings("unchecked")
+ final Sequence<T> res = QueryPlus.wrap((Query<T>) baseQuery)
.withIdentity(authenticationResult.getIdentity())
.run(texasRanger, responseContext);
- return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
+ return new QueryResponse<T>(res == null ? Sequences.empty() : res, responseContext);
}
/**
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index ea225efbae..1a72cfc3b8 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -200,7 +200,7 @@ public class QueryResource implements QueryCountStatsProvider
throw new ForbiddenException(authResult.toString());
}
- final QueryResponse queryResponse = queryLifecycle.execute();
+ final QueryResponse<?> queryResponse = queryLifecycle.execute();
final Sequence<?> results = queryResponse.getResults();
final ResponseContext responseContext = queryResponse.getResponseContext();
final String prevEtag = getPreviousEtag(req);
@@ -477,8 +477,8 @@ public class QueryResource implements QueryCountStatsProvider
}
ObjectWriter newOutputWriter(
- @Nullable QueryToolChest toolChest,
- @Nullable Query query,
+ @Nullable QueryToolChest<?, Query<?>> toolChest,
+ @Nullable Query<?> query,
boolean serializeDateTimeAsLong
)
{
diff --git a/server/src/main/java/org/apache/druid/server/QueryResponse.java b/server/src/main/java/org/apache/druid/server/QueryResponse.java
index 69908aee94..d590c5014e 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResponse.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResponse.java
@@ -22,23 +22,23 @@ package org.apache.druid.server;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.context.ResponseContext;
-public class QueryResponse
+public class QueryResponse<T>
{
- public static QueryResponse withEmptyContext(Sequence results)
- {
- return new QueryResponse(results, ResponseContext.createEmpty());
- }
-
- private final Sequence results;
+ private final Sequence<T> results;
private final ResponseContext responseContext;
- public QueryResponse(final Sequence results, final ResponseContext responseContext)
+ public QueryResponse(final Sequence<T> results, final ResponseContext responseContext)
{
this.results = results;
this.responseContext = responseContext;
}
- public Sequence getResults()
+ public static <T> QueryResponse<T> withEmptyContext(Sequence<T> results)
+ {
+ return new QueryResponse<T>(results, ResponseContext.createEmpty());
+ }
+
+ public Sequence<T> getResults()
{
return results;
}
diff --git a/sql/src/main/codegen/includes/insert.ftl b/sql/src/main/codegen/includes/insert.ftl
index 07898aaadc..c0e04bc772 100644
--- a/sql/src/main/codegen/includes/insert.ftl
+++ b/sql/src/main/codegen/includes/insert.ftl
@@ -38,7 +38,7 @@ SqlNode DruidSqlInsertEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
- throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
+ throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
diff --git a/sql/src/main/codegen/includes/replace.ftl b/sql/src/main/codegen/includes/replace.ftl
index ed9eee46de..ed8dbb10ee 100644
--- a/sql/src/main/codegen/includes/replace.ftl
+++ b/sql/src/main/codegen/includes/replace.ftl
@@ -58,7 +58,7 @@ SqlNode DruidSqlReplaceEof() :
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
- throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause");
+ throw new ParseException("CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause");
}
}
// EOF is also present in SqlStmtEof but EOF is a special case and a single EOF can be consumed multiple times.
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index e173213a0c..507216c236 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -98,7 +98,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
* Do the actual execute step which allows subclasses to wrap the sequence,
* as is sometimes needed for testing.
*/
- public QueryResponse run()
+ public QueryResponse<Object[]> run()
{
try {
// Check cancellation. Required for SqlResourceTest to work.
@@ -176,7 +176,7 @@ public class DirectStatement extends AbstractStatement implements Cancelable
*
* @return sequence which delivers query results
*/
- public QueryResponse execute()
+ public QueryResponse<Object[]> execute()
{
return plan().run();
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
new file mode 100644
index 0000000000..26f019e0a1
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlIngest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.parser;
+
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.druid.java.util.common.granularity.Granularity;
+
+import javax.annotation.Nullable;
+
+/**
+ * Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
+ * Allows Planner code to work with these two statements generically where they
+ * share common clauses.
+ */
+public abstract class DruidSqlIngest extends SqlInsert
+{
+ protected final Granularity partitionedBy;
+
+ // Used in the unparse function to generate the original query since we convert the string to an enum
+ protected final String partitionedByStringForUnparse;
+
+ @Nullable
+ protected final SqlNodeList clusteredBy;
+
+ public DruidSqlIngest(SqlParserPos pos,
+ SqlNodeList keywords,
+ SqlNode targetTable,
+ SqlNode source,
+ SqlNodeList columnList,
+ @Nullable Granularity partitionedBy,
+ @Nullable String partitionedByStringForUnparse,
+ @Nullable SqlNodeList clusteredBy
+ )
+ {
+ super(pos, keywords, targetTable, source, columnList);
+
+ this.partitionedByStringForUnparse = partitionedByStringForUnparse;
+ this.partitionedBy = partitionedBy;
+ this.clusteredBy = clusteredBy;
+ }
+
+ public Granularity getPartitionedBy()
+ {
+ return partitionedBy;
+ }
+
+ @Nullable
+ public SqlNodeList getClusteredBy()
+ {
+ return clusteredBy;
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
index dbaace1d93..c2eeb2ed1e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlInsert.java
@@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.parser;
-import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@@ -32,24 +31,16 @@ import javax.annotation.Nullable;
/**
* Extends the 'insert' call to hold custom parameters specific to Druid i.e. PARTITIONED BY and CLUSTERED BY
- * This class extends the {@link SqlInsert} so that this SqlNode can be used in
+ * This class extends the {@link DruidSqlIngest} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
-public class DruidSqlInsert extends SqlInsert
+public class DruidSqlInsert extends DruidSqlIngest
{
public static final String SQL_INSERT_SEGMENT_GRANULARITY = "sqlInsertSegmentGranularity";
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;
- private final Granularity partitionedBy;
-
- // Used in the unparse function to generate the original query since we convert the string to an enum
- private final String partitionedByStringForUnparse;
-
- @Nullable
- private final SqlNodeList clusteredBy;
-
/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@@ -61,35 +52,18 @@ public class DruidSqlInsert extends SqlInsert
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy
- ) throws ParseException
+ )
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
- insertNode.getTargetColumnList()
+ insertNode.getTargetColumnList(),
+ partitionedBy,
+ partitionedByStringForUnparse,
+ clusteredBy
);
- if (partitionedBy == null) {
- throw new ParseException("INSERT statements must specify PARTITIONED BY clause explicitly");
- }
- this.partitionedBy = partitionedBy;
-
- Preconditions.checkNotNull(partitionedByStringForUnparse);
- this.partitionedByStringForUnparse = partitionedByStringForUnparse;
-
- this.clusteredBy = clusteredBy;
- }
-
- @Nullable
- public SqlNodeList getClusteredBy()
- {
- return clusteredBy;
- }
-
- public Granularity getPartitionedBy()
- {
- return partitionedBy;
}
@Nonnull
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
index fb41bf7656..d527a08b59 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/parser/DruidSqlReplace.java
@@ -19,7 +19,6 @@
package org.apache.druid.sql.calcite.parser;
-import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
@@ -38,22 +37,14 @@ import javax.annotation.Nullable;
* This class extends the {@link SqlInsert} so that this SqlNode can be used in
* {@link org.apache.calcite.sql2rel.SqlToRelConverter} for getting converted into RelNode, and further processing
*/
-public class DruidSqlReplace extends SqlInsert
+public class DruidSqlReplace extends DruidSqlIngest
{
public static final String SQL_REPLACE_TIME_CHUNKS = "sqlReplaceTimeChunks";
public static final SqlOperator OPERATOR = new SqlSpecialOperator("REPLACE", SqlKind.OTHER);
- private final Granularity partitionedBy;
-
- // Used in the unparse function to generate the original query since we convert the string to an enum
- private final String partitionedByStringForUnparse;
-
private final SqlNode replaceTimeQuery;
- @Nullable
- private final SqlNodeList clusteredBy;
-
/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
@@ -66,28 +57,20 @@ public class DruidSqlReplace extends SqlInsert
@Nullable String partitionedByStringForUnparse,
@Nullable SqlNodeList clusteredBy,
@Nullable SqlNode replaceTimeQuery
- ) throws ParseException
+ )
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
- insertNode.getTargetColumnList()
+ insertNode.getTargetColumnList(),
+ partitionedBy,
+ partitionedByStringForUnparse,
+ clusteredBy
);
- if (replaceTimeQuery == null) {
- throw new ParseException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.");
- }
- if (partitionedBy == null) {
- throw new ParseException("REPLACE statements must specify PARTITIONED BY clause explicitly");
- }
- this.partitionedBy = partitionedBy;
-
- this.partitionedByStringForUnparse = Preconditions.checkNotNull(partitionedByStringForUnparse);
this.replaceTimeQuery = replaceTimeQuery;
-
- this.clusteredBy = clusteredBy;
}
public SqlNode getReplaceTimeQuery()
@@ -95,17 +78,6 @@ public class DruidSqlReplace extends SqlInsert
return replaceTimeQuery;
}
- public Granularity getPartitionedBy()
- {
- return partitionedBy;
- }
-
- @Nullable
- public SqlNodeList getClusteredBy()
- {
- return clusteredBy;
- }
-
@Nonnull
@Override
public SqlOperator getOperator()
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
index 75be75855c..68db56dd37 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java
@@ -19,90 +19,31 @@
package org.apache.druid.sql.calcite.planner;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Suppliers;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.BindableConvention;
-import org.apache.calcite.interpreter.BindableRel;
-import org.apache.calcite.interpreter.Bindables;
-import org.apache.calcite.jdbc.CalciteSchema;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelRoot;
-import org.apache.calcite.rel.RelVisitor;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalSort;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExplain;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlInsert;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
-import org.apache.calcite.sql.SqlOrderBy;
import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.ValidationException;
-import org.apache.calcite.util.Pair;
-import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularity;
-import org.apache.druid.java.util.common.guava.BaseSequence;
-import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.query.Query;
-import org.apache.druid.segment.DimensionHandlerUtils;
-import org.apache.druid.server.QueryResponse;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
-import org.apache.druid.sql.calcite.rel.DruidConvention;
-import org.apache.druid.sql.calcite.rel.DruidQuery;
-import org.apache.druid.sql.calcite.rel.DruidRel;
-import org.apache.druid.sql.calcite.rel.DruidUnionRel;
-import org.apache.druid.sql.calcite.run.EngineFeature;
-import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
-import org.apache.druid.sql.calcite.table.DruidTable;
-import org.apache.druid.utils.Throwables;
import org.joda.time.DateTimeZone;
-import javax.annotation.Nullable;
import java.io.Closeable;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import java.util.function.Function;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/**
* Druid SQL planner. Wraps the underlying Calcite planner with Druid-specific
@@ -121,27 +62,13 @@ public class DruidPlanner implements Closeable
START, VALIDATED, PREPARED, PLANNED
}
- private static final EmittingLogger log = new EmittingLogger(DruidPlanner.class);
- private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
- @VisibleForTesting
- public static final String UNNAMED_INGESTION_COLUMN_ERROR =
- "Cannot ingest expressions that do not have an alias "
- + "or columns with names like EXPR$[digit].\n"
- + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as "
- + "\"func(X) as myColumn\"";
-
private final FrameworkConfig frameworkConfig;
private final CalcitePlanner planner;
private final PlannerContext plannerContext;
private final SqlEngine engine;
private State state = State.START;
- private ParsedNodes parsed;
- private SqlNode validatedQueryNode;
+ private SqlStatementHandler handler;
private boolean authorized;
- private PrepareResult prepareResult;
- private Set<ResourceAction> resourceActions;
- private RelRoot rootQueryRel;
- private RexBuilder rexBuilder;
DruidPlanner(
final FrameworkConfig frameworkConfig,
@@ -170,80 +97,44 @@ public class DruidPlanner implements Closeable
// Parse the query string.
SqlNode root = planner.parse(plannerContext.getSql());
- parsed = ParsedNodes.create(root, plannerContext.getTimeZone());
-
- if (parsed.isSelect() && !plannerContext.engineHasFeature(EngineFeature.CAN_SELECT)) {
- throw new ValidationException(StringUtils.format("Cannot execute SELECT with SQL engine '%s'.", engine.name()));
- } else if (parsed.isInsert() && !plannerContext.engineHasFeature(EngineFeature.CAN_INSERT)) {
- throw new ValidationException(StringUtils.format("Cannot execute INSERT with SQL engine '%s'.", engine.name()));
- } else if (parsed.isReplace() && !plannerContext.engineHasFeature(EngineFeature.CAN_REPLACE)) {
- throw new ValidationException(StringUtils.format("Cannot execute REPLACE with SQL engine '%s'.", engine.name()));
- }
+ handler = createHandler(root);
try {
- if (parsed.getIngestionGranularity() != null) {
- plannerContext.getQueryContext().addSystemParam(
- DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
- plannerContext.getJsonMapper().writeValueAsString(parsed.getIngestionGranularity())
- );
- }
- }
- catch (JsonProcessingException e) {
- throw new ValidationException("Unable to serialize partition granularity.");
- }
-
- if (parsed.getReplaceIntervals() != null) {
- plannerContext.getQueryContext().addSystemParam(
- DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
- String.join(",", parsed.getReplaceIntervals())
- );
- }
-
- try {
- // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
- // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
- // replacement.
- //
- // Parameter replacement is done only if the client provides parameter values.
- // If this is a PREPARE-only, then there will be no values even if the statement contains
- // parameters. If this is a PLAN, then we'll catch later the case that the statement
- // contains parameters, but no values were provided.
- SqlNode queryNode = parsed.getQueryNode();
- if (!plannerContext.getParameters().isEmpty()) {
- queryNode = queryNode.accept(new SqlParameterizerShuttle(plannerContext));
- }
- validatedQueryNode = planner.validate(queryNode);
+ handler.validate();
+ plannerContext.setResourceActions(handler.resourceActions());
}
catch (RuntimeException e) {
throw new ValidationException(e);
}
- final SqlValidator validator = planner.getValidator();
- SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(validator, plannerContext);
- validatedQueryNode.accept(resourceCollectorShuttle);
-
- resourceActions = new HashSet<>(resourceCollectorShuttle.getResourceActions());
+ state = State.VALIDATED;
+ }
- if (parsed.isInsert() || parsed.isReplace()) {
- // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
- // the number of rows inserted to be limited which is likely to be confusing and unintended.
- if (plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
- throw new ValidationException(
- StringUtils.format(
- "%s cannot be provided with %s.",
- PlannerContext.CTX_SQL_OUTER_LIMIT,
- parsed.getInsertOrReplace().getOperator().getName()
- )
- );
+ private SqlStatementHandler createHandler(final SqlNode node) throws ValidationException
+ {
+ SqlNode query = node;
+ SqlExplain explain = null;
+ if (query.getKind() == SqlKind.EXPLAIN) {
+ explain = (SqlExplain) query;
+ query = explain.getExplicandum();
+ }
+
+ SqlStatementHandler.HandlerContext handlerContext = new HandlerContextImpl();
+ if (query.getKind() == SqlKind.INSERT) {
+ if (query instanceof DruidSqlInsert) {
+ return new IngestHandler.InsertHandler(handlerContext, (DruidSqlInsert) query, explain);
+ } else if (query instanceof DruidSqlReplace) {
+ return new IngestHandler.ReplaceHandler(handlerContext, (DruidSqlReplace) query, explain);
}
- final String targetDataSource = validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
- resourceActions.add(new ResourceAction(new Resource(targetDataSource, ResourceType.DATASOURCE), Action.WRITE));
}
- state = State.VALIDATED;
- plannerContext.setResourceActions(resourceActions);
+ if (query.isA(SqlKind.QUERY)) {
+ return new QueryHandler.SelectHandler(handlerContext, query, explain);
+ }
+ throw new ValidationException(StringUtils.format("Cannot execute [%s].", node.getKind()));
}
+
/**
* Prepare a SQL query for execution, including some initial parsing and
* validation and any dynamic parameter type resolution, to support prepared
@@ -260,30 +151,9 @@ public class DruidPlanner implements Closeable
public PrepareResult prepare()
{
Preconditions.checkState(state == State.VALIDATED);
-
- rootQueryRel = planner.rel(validatedQueryNode);
- doPrepare();
+ handler.prepare();
state = State.PREPARED;
- return prepareResult;
- }
-
- private void doPrepare()
- {
- final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
- final SqlValidator validator = planner.getValidator();
- final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode);
- final RelDataType returnedRowType;
-
- if (parsed.getExplainNode() != null) {
- returnedRowType = getExplainStructType(typeFactory);
- } else if (parsed.isSelect()) {
- returnedRowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
- } else {
- assert parsed.insertOrReplace != null;
- returnedRowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
- }
-
- prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
+ return prepareResult();
}
/**
@@ -309,14 +179,14 @@ public class DruidPlanner implements Closeable
/**
* Return the resource actions corresponding to the datasources and views which
- * an authenticated request must be authorized for to process the
- * query. The actions will be {@code null} if the
- * planner has not yet advanced to the validation step. This may occur if
- * validation fails and the caller accesses the resource
- * actions as part of clean-up.
+ * an authenticated request must be authorized for to process the query. The
+ * actions will be {@code null} if the planner has not yet advanced to the
+ * validation step. This may occur if validation fails and the caller accesses
+ * the resource actions as part of clean-up.
*/
public Set<ResourceAction> resourceActions(boolean includeContext)
{
+ Set<ResourceAction> resourceActions = plannerContext.getResourceActions();
if (includeContext) {
Set<ResourceAction> actions = new HashSet<>(resourceActions);
plannerContext.getQueryContext().getUserParams().keySet().forEach(contextParam -> actions.add(
@@ -330,69 +200,18 @@ public class DruidPlanner implements Closeable
/**
* Plan an SQL query for execution, returning a {@link PlannerResult} which can be used to actually execute the query.
- *
- * Ideally, the query can be planned into a native Druid query, using {@link #planWithDruidConvention}, but will
- * fall-back to {@link #planWithBindableConvention} if this is not possible.
- *
- * Planning reuses the validation done in `validate()` which must be called first.
+ * <p>
+ * Ideally, the query can be planned into a native Druid query, but will
+ * fall-back to bindable convention if this is not possible.
+ * <p>
+ * Planning reuses the validation done in {@code validate()} which must be called first.
*/
public PlannerResult plan() throws ValidationException
{
Preconditions.checkState(state == State.VALIDATED || state == State.PREPARED);
Preconditions.checkState(authorized);
- if (state == State.VALIDATED) {
- rootQueryRel = planner.rel(validatedQueryNode);
- }
-
- final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
-
- // the planner's type factory is not available until after parsing
- this.rexBuilder = new RexBuilder(planner.getTypeFactory());
state = State.PLANNED;
-
- try {
- if (!bindableTables.isEmpty()) {
- // Consider BINDABLE convention when necessary. Used for metadata tables.
-
- if (parsed.isInsert() || parsed.isReplace()) {
- // Throws ValidationException if the target table is itself bindable.
- validateAndGetDataSourceForIngest(parsed.getInsertOrReplace());
- }
-
- if (!plannerContext.engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
- throw new ValidationException(
- StringUtils.format(
- "Cannot query table%s [%s] with SQL engine '%s'.",
- bindableTables.size() != 1 ? "s" : "",
- bindableTables.stream()
- .map(table -> Joiner.on(".").join(table.getQualifiedName()))
- .collect(Collectors.joining(", ")),
- engine.name()
- )
- );
- }
-
- return planWithBindableConvention(rootQueryRel, parsed.getExplainNode());
- } else {
- // DRUID convention is used whenever there are no tables that require BINDABLE.
- return planWithDruidConvention(rootQueryRel, parsed.getExplainNode(), parsed.getInsertOrReplace());
- }
- }
- catch (Exception e) {
- Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
- if (null == cannotPlanException) {
- // Not a CannotPlanException, rethrow without logging.
- throw e;
- }
-
- Logger logger = log;
- if (!plannerContext.getQueryContext().isDebug()) {
- logger = log.noStackTrace();
- }
- String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException);
- logger.warn(e, errorMessage);
- throw new UnsupportedSQLQueryException(errorMessage);
- }
+ return handler.plan();
}
public PlannerContext getPlannerContext()
@@ -402,7 +221,7 @@ public class DruidPlanner implements Closeable
public PrepareResult prepareResult()
{
- return prepareResult;
+ return handler.prepareResult();
}
@Override
@@ -411,647 +230,48 @@ public class DruidPlanner implements Closeable
planner.close();
}
- /**
- * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
- */
- private PlannerResult planWithDruidConvention(
- final RelRoot root,
- @Nullable final SqlExplain explain,
- @Nullable final SqlInsert insertOrReplace
- ) throws ValidationException
+ protected class HandlerContextImpl implements SqlStatementHandler.HandlerContext
{
- final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(root);
- final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot, insertOrReplace);
- plannerContext.setQueryMaker(queryMaker);
- if (prepareResult == null) {
- doPrepare();
- }
-
- // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
- // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
- // successfully substitute all parameter values, and will cause a failure if any
- // dynamic a parameters are not bound. This occurs at least for DATE parameters
- // with integer values.
- //
- // This check also catches the case where we did not do a parameter check earlier
- // because no values were provided. (Values are not required in the PREPARE case
- // but now that we're planning, we require them.)
- RelNode parameterized = possiblyLimitedRoot.rel.accept(
- new RelParameterizerShuttle(plannerContext)
- );
- final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
- CalciteRulesManager.DRUID_CONVENTION_RULES,
- planner.getEmptyTraitSet()
- .replace(DruidConvention.instance())
- .plus(root.collation),
- parameterized
- );
-
- if (explain != null) {
- return planExplanation(druidRel, explain, true);
- } else {
- // Compute row type.
- final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
- final RelDataType rowType;
-
- if (parsed.isSelect()) {
- rowType = engine.resultTypeForSelect(typeFactory, rootQueryRel.validatedRowType);
- } else {
- assert parsed.insertOrReplace != null;
- rowType = engine.resultTypeForInsert(typeFactory, rootQueryRel.validatedRowType);
- }
-
- // Start the query.
- final Supplier<QueryResponse> resultsSupplier = () -> {
- // sanity check
- final Set<ResourceAction> readResourceActions =
- plannerContext.getResourceActions()
- .stream()
- .filter(action -> action.getAction() == Action.READ)
- .collect(Collectors.toSet());
- Preconditions.checkState(
- readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
- // The resources found in the plannerContext can be less than the datasources in
- // the query plan, because the query planner can eliminate empty tables by replacing
- // them with InlineDataSource of empty rows.
- || readResourceActions.size() >= druidRel.getDataSourceNames().size(),
- "Authorization sanity check failed"
- );
-
- return druidRel.runQuery();
- };
-
- return new PlannerResult(resultsSupplier, rowType);
- }
- }
-
- /**
- * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for
- * things that are not directly translatable to native Druid queries such
- * as system tables and just a general purpose (but definitely not optimized)
- * fall-back.
- *
- * See {@link #planWithDruidConvention} which will handle things which are
- * directly translatable to native Druid queries.
- *
- * The bindable path handles parameter substitution of any values not
- * bound by the earlier steps.
- */
- private PlannerResult planWithBindableConvention(
- final RelRoot root,
- @Nullable final SqlExplain explain
- )
- {
- if (prepareResult == null) {
- doPrepare();
- }
-
- BindableRel bindableRel = (BindableRel) planner.transform(
- CalciteRulesManager.BINDABLE_CONVENTION_RULES,
- planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(root.collation),
- root.rel
- );
-
- if (!root.isRefTrivial()) {
- // Add a projection on top to accommodate root.fields.
- final List<RexNode> projects = new ArrayList<>();
- final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
- for (int field : Pair.left(root.fields)) {
- projects.add(rexBuilder.makeInputRef(bindableRel, field));
- }
- bindableRel = new Bindables.BindableProject(
- bindableRel.getCluster(),
- bindableRel.getTraitSet(),
- bindableRel,
- projects,
- root.validatedRowType
- );
- }
-
- if (explain != null) {
- return planExplanation(bindableRel, explain, false);
- } else {
- final BindableRel theRel = bindableRel;
- final DataContext dataContext = plannerContext.createDataContext(
- planner.getTypeFactory(),
- plannerContext.getParameters()
- );
- final Supplier<QueryResponse> resultsSupplier = () -> {
- final Enumerable<?> enumerable = theRel.bind(dataContext);
- final Enumerator<?> enumerator = enumerable.enumerator();
- return QueryResponse.withEmptyContext(Sequences.withBaggage(
- new BaseSequence<>(
- new BaseSequence.IteratorMaker<Object[], EnumeratorIterator<Object[]>>()
- {
- @Override
- public EnumeratorIterator<Object[]> make()
- {
- return new EnumeratorIterator<>(new Iterator<Object[]>()
- {
- @Override
- public boolean hasNext()
- {
- return enumerator.moveNext();
- }
-
- @Override
- public Object[] next()
- {
- return (Object[]) enumerator.current();
- }
- });
- }
-
- @Override
- public void cleanup(EnumeratorIterator<Object[]> iterFromMake)
- {
-
- }
- }
- ), enumerator::close)
- );
- };
- return new PlannerResult(resultsSupplier, root.validatedRowType);
- }
- }
-
- /**
- * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
- */
- private PlannerResult planExplanation(
- final RelNode rel,
- final SqlExplain explain,
- final boolean isDruidConventionExplanation
- )
- {
- String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
- String resourcesString;
- try {
- if (isDruidConventionExplanation && rel instanceof DruidRel) {
- // Show the native queries instead of Calcite's explain if the legacy flag is turned off
- if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
- DruidRel<?> druidRel = (DruidRel<?>) rel;
- try {
- explanation = explainSqlPlanAsNativeQueries(druidRel);
- }
- catch (Exception ex) {
- log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan");
- }
- }
- }
- final Set<Resource> resources =
- plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
- resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources);
- }
- catch (JsonProcessingException jpe) {
- // this should never happen, we create the Resources here, not a user
- log.error(jpe, "Encountered exception while serializing Resources for explain output");
- resourcesString = null;
- }
- final Supplier<QueryResponse> resultsSupplier = Suppliers.ofInstance(
- QueryResponse.withEmptyContext(Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString})))
- );
- return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
- }
-
- /**
- * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose
- * and not indicative of the native Druid Queries which will get executed
- * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
- *
- * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
- * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
- * @throws JsonProcessingException
- */
- private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
- {
- ObjectMapper jsonMapper = plannerContext.getJsonMapper();
- List<DruidQuery> druidQueryList;
- druidQueryList = flattenOutermostRel(rel)
- .stream()
- .map(druidRel -> druidRel.toDruidQuery(false))
- .collect(Collectors.toList());
-
-
- // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
- // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
- // serializing it using normal list method.
- ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
-
- for (DruidQuery druidQuery : druidQueryList) {
- Query<?> nativeQuery = druidQuery.getQuery();
- ObjectNode objectNode = jsonMapper.createObjectNode();
- objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
- objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
- nativeQueriesArrayNode.add(objectNode);
- }
-
- return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
- }
-
- /**
- * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
- * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
- * node
- * For eg, a DruidRel structure of kind:
- * DruidUnionRel
- * DruidUnionRel
- * DruidRel (A)
- * DruidRel (B)
- * DruidRel(C)
- * will return [DruidRel(A), DruidRel(B), DruidRel(C)]
- *
- * @param outermostDruidRel The outermost rel which is to be flattened
- * @return a list of DruidRel's which donot have a DruidUnionRel nested in between them
- */
- private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
- {
- List<DruidRel<?>> druidRels = new ArrayList<>();
- flattenOutermostRel(outermostDruidRel, druidRels);
- return druidRels;
- }
-
- /**
- * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
- * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
- * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
- *
- * @param druidRel The current relNode
- * @param flattendListAccumulator Accumulator list which needs to be appended by this method
- */
- private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
- {
- if (druidRel instanceof DruidUnionRel) {
- DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
- druidUnionRel.getInputs().forEach(innerRelNode -> {
- DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
- flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
- });
- } else {
- flattendListAccumulator.add(druidRel);
- }
- }
-
- /**
- * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
- * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
- * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
- *
- * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
- * the web console, allowing it to apply a limit to queries without rewriting the original SQL.
- *
- * @param root root node
- *
- * @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
- */
- @Nullable
- private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
- {
- Object outerLimitObj = plannerContext.getQueryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
- Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
- if (outerLimit == null) {
- return root;
- }
-
- final LogicalSort newRootRel;
-
- if (root.rel instanceof Sort) {
- Sort sort = (Sort) root.rel;
-
- final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
- final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
-
- if (newOffsetLimit.equals(originalOffsetLimit)) {
- // nothing to do, don't bother to make a new sort
- return root;
- }
-
- newRootRel = LogicalSort.create(
- sort.getInput(),
- sort.collation,
- newOffsetLimit.getOffsetAsRexNode(rexBuilder),
- newOffsetLimit.getLimitAsRexNode(rexBuilder)
- );
- } else {
- newRootRel = LogicalSort.create(
- root.rel,
- root.collation,
- null,
- new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
- );
- }
-
- return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
- }
-
- private QueryMaker buildQueryMaker(
- final RelRoot rootQueryRel,
- @Nullable final SqlInsert insertOrReplace
- ) throws ValidationException
- {
- if (insertOrReplace != null) {
- final String targetDataSource = validateAndGetDataSourceForIngest(insertOrReplace);
- validateColumnsForIngestion(rootQueryRel);
- return engine.buildQueryMakerForInsert(targetDataSource, rootQueryRel, plannerContext);
- } else {
- return engine.buildQueryMakerForSelect(rootQueryRel, plannerContext);
- }
- }
-
- private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
- {
- return typeFactory.createStructType(
- ImmutableList.of(
- Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR),
- Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
- ),
- ImmutableList.of("PLAN", "RESOURCES")
- );
- }
-
- /**
- * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
- * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
- */
- private String validateAndGetDataSourceForIngest(final SqlInsert insert) throws ValidationException
- {
- final String operatorName = insert.getOperator().getName();
- if (insert.isUpsert()) {
- throw new ValidationException("UPSERT is not supported.");
- }
-
- if (insert.getTargetColumnList() != null) {
- throw new ValidationException(operatorName + " with target column list is not supported.");
- }
-
- final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
- final String dataSource;
-
- if (tableIdentifier.names.isEmpty()) {
- // I don't think this can happen, but include a branch for it just in case.
- throw new ValidationException(operatorName + " requires target table.");
- } else if (tableIdentifier.names.size() == 1) {
- // Unqualified name.
- dataSource = Iterables.getOnlyElement(tableIdentifier.names);
- } else {
- // Qualified name.
- final String defaultSchemaName =
- Iterables.getOnlyElement(CalciteSchema.from(frameworkConfig.getDefaultSchema()).path(null));
-
- if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
- dataSource = tableIdentifier.names.get(1);
- } else {
- throw new ValidationException(
- StringUtils.format(
- "Cannot %s into [%s] because it is not a Druid datasource (schema = %s).",
- operatorName,
- tableIdentifier,
- defaultSchemaName
- )
- );
- }
- }
-
- try {
- IdUtils.validateId(operatorName + " dataSource", dataSource);
- }
- catch (IllegalArgumentException e) {
- throw new ValidationException(e.getMessage());
- }
-
- return dataSource;
- }
-
- private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException
- {
- // Check that there are no unnamed columns in the insert.
- for (Pair<Integer, String> field : rootQueryRel.fields) {
- if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
- throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR);
- }
- }
- }
-
- private String buildSQLPlanningErrorMessage(Throwable exception)
- {
- String errorMessage = plannerContext.getPlanningError();
- if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
- errorMessage = exception.getMessage();
- }
- if (null == errorMessage) {
- errorMessage = "Please check Broker logs for additional details.";
- } else {
- // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
- errorMessage = "Possible error: " + errorMessage;
- }
- // Finally, add the query itself to error message that user will get.
- return StringUtils.format("Query not supported. %s SQL was: %s", errorMessage, plannerContext.getSql());
- }
-
- private static Set<RelOptTable> getBindableTables(final RelNode relNode)
- {
- class HasBindableVisitor extends RelVisitor
- {
- private final Set<RelOptTable> found = new HashSet<>();
-
- @Override
- public void visit(RelNode node, int ordinal, RelNode parent)
- {
- if (node instanceof TableScan) {
- RelOptTable table = node.getTable();
- if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
- found.add(table);
- return;
- }
- }
-
- super.visit(node, ordinal, parent);
- }
- }
-
- final HasBindableVisitor visitor = new HasBindableVisitor();
- visitor.go(relNode);
- return visitor.found;
- }
-
- private static class EnumeratorIterator<T> implements Iterator<T>
- {
- private final Iterator<T> it;
-
- EnumeratorIterator(Iterator<T> it)
- {
- this.it = it;
- }
-
@Override
- public boolean hasNext()
+ public PlannerContext plannerContext()
{
- return it.hasNext();
+ return plannerContext;
}
@Override
- public T next()
- {
- return it.next();
- }
- }
-
- private static class ParsedNodes
- {
- @Nullable
- private final SqlExplain explain;
-
- @Nullable
- private final SqlInsert insertOrReplace;
-
- private final SqlNode query;
-
- @Nullable
- private final Granularity ingestionGranularity;
-
- @Nullable
- private final List<String> replaceIntervals;
-
- private ParsedNodes(
- @Nullable SqlExplain explain,
- @Nullable SqlInsert insertOrReplace,
- SqlNode query,
- @Nullable Granularity ingestionGranularity,
- @Nullable List<String> replaceIntervals
- )
- {
- this.explain = explain;
- this.insertOrReplace = insertOrReplace;
- this.query = query;
- this.ingestionGranularity = ingestionGranularity;
- this.replaceIntervals = replaceIntervals;
- }
-
- static ParsedNodes create(final SqlNode node, DateTimeZone dateTimeZone) throws ValidationException
+ public SqlEngine engine()
{
- SqlNode query = node;
- SqlExplain explain = null;
- if (query.getKind() == SqlKind.EXPLAIN) {
- explain = (SqlExplain) query;
- query = explain.getExplicandum();
- }
-
- if (query.getKind() == SqlKind.INSERT) {
- if (query instanceof DruidSqlInsert) {
- return handleInsert(explain, (DruidSqlInsert) query);
- } else if (query instanceof DruidSqlReplace) {
- return handleReplace(explain, (DruidSqlReplace) query, dateTimeZone);
- }
- }
-
- if (!query.isA(SqlKind.QUERY)) {
- throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
- }
-
- return new ParsedNodes(explain, null, query, null, null);
+ return engine;
}
- static ParsedNodes handleInsert(SqlExplain explain, DruidSqlInsert druidSqlInsert) throws ValidationException
- {
- SqlNode query = druidSqlInsert.getSource();
-
- // Check if ORDER BY clause is not provided to the underlying query
- if (query instanceof SqlOrderBy) {
- SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- SqlNodeList orderByList = sqlOrderBy.orderList;
- if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
- throw new ValidationException("Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.");
- }
- }
-
- Granularity ingestionGranularity = druidSqlInsert.getPartitionedBy();
-
- if (druidSqlInsert.getClusteredBy() != null) {
- query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlInsert.getClusteredBy());
- }
-
- if (!query.isA(SqlKind.QUERY)) {
- throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
- }
-
- return new ParsedNodes(explain, druidSqlInsert, query, ingestionGranularity, null);
- }
-
- static ParsedNodes handleReplace(SqlExplain explain, DruidSqlReplace druidSqlReplace, DateTimeZone dateTimeZone)
- throws ValidationException
- {
- SqlNode query = druidSqlReplace.getSource();
-
- // Check if ORDER BY clause is not provided to the underlying query
- if (query instanceof SqlOrderBy) {
- SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
- SqlNodeList orderByList = sqlOrderBy.orderList;
- if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
- throw new ValidationException("Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.");
- }
- }
-
- SqlNode replaceTimeQuery = druidSqlReplace.getReplaceTimeQuery();
- if (replaceTimeQuery == null) {
- throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.");
- }
-
- Granularity ingestionGranularity = druidSqlReplace.getPartitionedBy();
- List<String> replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(replaceTimeQuery, ingestionGranularity, dateTimeZone);
-
- if (druidSqlReplace.getClusteredBy() != null) {
- query = DruidSqlParserUtils.convertClusterByToOrderBy(query, druidSqlReplace.getClusteredBy());
- }
-
- if (!query.isA(SqlKind.QUERY)) {
- throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
- }
-
- return new ParsedNodes(explain, druidSqlReplace, query, ingestionGranularity, replaceIntervals);
- }
-
- @Nullable
- public SqlExplain getExplainNode()
- {
- return explain;
- }
-
- public boolean isSelect()
- {
- return insertOrReplace == null;
- }
-
- public boolean isInsert()
- {
- return insertOrReplace != null && !isReplace();
- }
-
- public boolean isReplace()
+ @Override
+ public CalcitePlanner planner()
{
- return insertOrReplace instanceof DruidSqlReplace;
+ return planner;
}
- @Nullable
- public SqlInsert getInsertOrReplace()
+ @Override
+ public QueryContext queryContext()
{
- return insertOrReplace;
+ return plannerContext.getQueryContext();
}
- @Nullable
- public List<String> getReplaceIntervals()
+ @Override
+ public SchemaPlus defaultSchema()
{
- return replaceIntervals;
+ return frameworkConfig.getDefaultSchema();
}
- public SqlNode getQueryNode()
+ @Override
+ public ObjectMapper jsonMapper()
{
- return query;
+ return plannerContext.getJsonMapper();
}
- @Nullable
- public Granularity getIngestionGranularity()
+ @Override
+ public DateTimeZone timeZone()
{
- return ingestionGranularity;
+ return plannerContext.getTimeZone();
}
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
new file mode 100644
index 0000000000..e59b8cf5e7
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/IngestHandler.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.Pair;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
+import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
+import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
+import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
+import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.run.QueryMaker;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public abstract class IngestHandler extends QueryHandler
+{
+ private static final Pattern UNNAMED_COLUMN_PATTERN = Pattern.compile("^EXPR\\$\\d+$", Pattern.CASE_INSENSITIVE);
+ @VisibleForTesting
+ public static final String UNNAMED_INGESTION_COLUMN_ERROR =
+ "Cannot ingest expressions that do not have an alias "
+ + "or columns with names like EXPR$[digit].\n"
+ + "E.g. if you are ingesting \"func(X)\", then you can rewrite it as "
+ + "\"func(X) as myColumn\"";
+
+ protected final Granularity ingestionGranularity;
+ protected String targetDatasource;
+
+ IngestHandler(
+ HandlerContext handlerContext,
+ DruidSqlIngest ingestNode,
+ SqlNode queryNode,
+ SqlExplain explain
+ )
+ {
+ super(handlerContext, queryNode, explain);
+ this.ingestionGranularity = ingestNode.getPartitionedBy();
+ }
+
+ protected static SqlNode convertQuery(DruidSqlIngest sqlNode) throws ValidationException
+ {
+ SqlNode query = sqlNode.getSource();
+
+ // Check if ORDER BY clause is not provided to the underlying query
+ if (query instanceof SqlOrderBy) {
+ SqlOrderBy sqlOrderBy = (SqlOrderBy) query;
+ SqlNodeList orderByList = sqlOrderBy.orderList;
+ if (!(orderByList == null || orderByList.equals(SqlNodeList.EMPTY))) {
+ String opName = sqlNode.getOperator().getName();
+ throw new ValidationException(StringUtils.format(
+ "Cannot have ORDER BY on %s %s statement, use CLUSTERED BY instead.",
+ "INSERT".equals(opName) ? "an" : "a",
+ opName
+ ));
+ }
+ }
+ if (sqlNode.getClusteredBy() != null) {
+ query = DruidSqlParserUtils.convertClusterByToOrderBy(query, sqlNode.getClusteredBy());
+ }
+
+ if (!query.isA(SqlKind.QUERY)) {
+ throw new ValidationException(StringUtils.format("Cannot execute [%s].", query.getKind()));
+ }
+ return query;
+ }
+
+ protected String operationName()
+ {
+ return ingestNode().getOperator().getName();
+ }
+
+ protected abstract DruidSqlIngest ingestNode();
+
+ @Override
+ public void validate() throws ValidationException
+ {
+ if (ingestNode().getPartitionedBy() == null) {
+ throw new ValidationException(StringUtils.format(
+ "%s statements must specify PARTITIONED BY clause explicitly",
+ operationName()
+ ));
+ }
+ try {
+ PlannerContext plannerContext = handlerContext.plannerContext();
+ if (ingestionGranularity != null) {
+ plannerContext.getQueryContext().addSystemParam(
+ DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY,
+ plannerContext.getJsonMapper().writeValueAsString(ingestionGranularity)
+ );
+ }
+ }
+ catch (JsonProcessingException e) {
+ throw new ValidationException("Unable to serialize partition granularity.");
+ }
+ super.validate();
+ // Check if CTX_SQL_OUTER_LIMIT is specified and fail the query if it is. CTX_SQL_OUTER_LIMIT being provided causes
+ // the number of rows inserted to be limited which is likely to be confusing and unintended.
+ if (handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT) != null) {
+ throw new ValidationException(
+ StringUtils.format(
+ "%s cannot be provided with %s.",
+ PlannerContext.CTX_SQL_OUTER_LIMIT,
+ operationName()
+ )
+ );
+ }
+ targetDatasource = validateAndGetDataSourceForIngest();
+ resourceActions.add(new ResourceAction(new Resource(targetDatasource, ResourceType.DATASOURCE), Action.WRITE));
+ }
+
+ @Override
+ protected RelDataType returnedRowType()
+ {
+ final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+ return handlerContext.engine().resultTypeForInsert(
+ typeFactory,
+ rootQueryRel.validatedRowType);
+ }
+
+ /**
+ * Extract target datasource from a {@link SqlInsert}, and also validate that the ingestion is of a form we support.
+ * Expects the target datasource to be either an unqualified name, or a name qualified by the default schema.
+ */
+ private String validateAndGetDataSourceForIngest() throws ValidationException
+ {
+ final SqlInsert insert = ingestNode();
+ if (insert.isUpsert()) {
+ throw new ValidationException("UPSERT is not supported.");
+ }
+
+ if (insert.getTargetColumnList() != null) {
+ throw new ValidationException(operationName() + " with a target column list is not supported.");
+ }
+
+ final SqlIdentifier tableIdentifier = (SqlIdentifier) insert.getTargetTable();
+ final String dataSource;
+
+ if (tableIdentifier.names.isEmpty()) {
+ // I don't think this can happen, but include a branch for it just in case.
+ throw new ValidationException(operationName() + " requires a target table.");
+ } else if (tableIdentifier.names.size() == 1) {
+ // Unqualified name.
+ dataSource = Iterables.getOnlyElement(tableIdentifier.names);
+ } else {
+ // Qualified name.
+ final String defaultSchemaName =
+ Iterables.getOnlyElement(CalciteSchema.from(handlerContext.defaultSchema()).path(null));
+
+ if (tableIdentifier.names.size() == 2 && defaultSchemaName.equals(tableIdentifier.names.get(0))) {
+ dataSource = tableIdentifier.names.get(1);
+ } else {
+ throw new ValidationException(
+ StringUtils.format(
+ "Cannot %s into %s because it is not a Druid datasource.",
+ operationName(),
+ tableIdentifier
+ )
+ );
+ }
+ }
+
+ try {
+ IdUtils.validateId(operationName() + " dataSource", dataSource);
+ }
+ catch (IllegalArgumentException e) {
+ throw new ValidationException(e.getMessage());
+ }
+
+ return dataSource;
+ }
+
+ @Override
+ protected PlannerResult planForDruid() throws ValidationException
+ {
+ return planWithDruidConvention();
+ }
+
+ @Override
+ protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
+ {
+ validateColumnsForIngestion(rootQueryRel);
+ return handlerContext.engine().buildQueryMakerForInsert(
+ targetDatasource,
+ rootQueryRel,
+ handlerContext.plannerContext());
+ }
+
+ private void validateColumnsForIngestion(RelRoot rootQueryRel) throws ValidationException
+ {
+ // Check that there are no unnamed columns in the insert.
+ for (Pair<Integer, String> field : rootQueryRel.fields) {
+ if (UNNAMED_COLUMN_PATTERN.matcher(field.right).matches()) {
+ throw new ValidationException(UNNAMED_INGESTION_COLUMN_ERROR);
+ }
+ }
+ }
+
+ /**
+ * Handler for the INSERT statement.
+ */
+ protected static class InsertHandler extends IngestHandler
+ {
+ private final DruidSqlInsert sqlNode;
+
+ public InsertHandler(
+ SqlStatementHandler.HandlerContext handlerContext,
+ DruidSqlInsert sqlNode,
+ SqlExplain explain
+ ) throws ValidationException
+ {
+ super(
+ handlerContext,
+ sqlNode,
+ convertQuery(sqlNode),
+ explain);
+ this.sqlNode = sqlNode;
+ }
+
+ @Override
+ public SqlNode sqlNode()
+ {
+ return sqlNode;
+ }
+
+ @Override
+ protected DruidSqlIngest ingestNode()
+ {
+ return sqlNode;
+ }
+
+ @Override
+ public void validate() throws ValidationException
+ {
+ if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_INSERT)) {
+ throw new ValidationException(StringUtils.format(
+ "Cannot execute INSERT with SQL engine '%s'.",
+ handlerContext.engine().name())
+ );
+ }
+ super.validate();
+ }
+ }
+
+ /**
+ * Handler for the REPLACE statement.
+ */
+ protected static class ReplaceHandler extends IngestHandler
+ {
+ private final DruidSqlReplace sqlNode;
+ private List<String> replaceIntervals;
+
+ public ReplaceHandler(
+ SqlStatementHandler.HandlerContext handlerContext,
+ DruidSqlReplace sqlNode,
+ SqlExplain explain
+ ) throws ValidationException
+ {
+ super(
+ handlerContext,
+ sqlNode,
+ convertQuery(sqlNode),
+ explain
+ );
+ this.sqlNode = sqlNode;
+ }
+
+ @Override
+ public SqlNode sqlNode()
+ {
+ return sqlNode;
+ }
+
+ @Override
+ protected DruidSqlIngest ingestNode()
+ {
+ return sqlNode;
+ }
+
+ @Override
+ public void validate() throws ValidationException
+ {
+ if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_REPLACE)) {
+ throw new ValidationException(StringUtils.format(
+ "Cannot execute REPLACE with SQL engine '%s'.",
+ handlerContext.engine().name())
+ );
+ }
+ SqlNode replaceTimeQuery = sqlNode.getReplaceTimeQuery();
+ if (replaceTimeQuery == null) {
+ throw new ValidationException("Missing time chunk information in OVERWRITE clause for REPLACE. Use "
+ + "OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.");
+ }
+
+ replaceIntervals = DruidSqlParserUtils.validateQueryAndConvertToIntervals(
+ replaceTimeQuery,
+ ingestionGranularity,
+ handlerContext.timeZone());
+ super.validate();
+ if (replaceIntervals != null) {
+ handlerContext.queryContext().addSystemParam(
+ DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS,
+ String.join(",", replaceIntervals)
+ );
+ }
+ }
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
index 57e3ba2cbc..cbb07ddb48 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerResult.java
@@ -32,12 +32,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class PlannerResult
{
- private final Supplier<QueryResponse> resultsSupplier;
+ private final Supplier<QueryResponse<Object[]>> resultsSupplier;
private final RelDataType rowType;
private final AtomicBoolean didRun = new AtomicBoolean();
public PlannerResult(
- final Supplier<QueryResponse> resultsSupplier,
+ final Supplier<QueryResponse<Object[]>> resultsSupplier,
final RelDataType rowType
)
{
@@ -53,7 +53,7 @@ public class PlannerResult
/**
* Run the query
*/
- public QueryResponse run()
+ public QueryResponse<Object[]> run()
{
if (!didRun.compareAndSet(false, true)) {
// Safety check.
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
new file mode 100644
index 0000000000..1d6a71b542
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java
@@ -0,0 +1,675 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.BindableConvention;
+import org.apache.calcite.interpreter.BindableRel;
+import org.apache.calcite.interpreter.Bindables;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.RelVisitor;
+import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.calcite.util.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.server.QueryResponse;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.rel.DruidConvention;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.DruidRel;
+import org.apache.druid.sql.calcite.rel.DruidUnionRel;
+import org.apache.druid.sql.calcite.run.EngineFeature;
+import org.apache.druid.sql.calcite.run.QueryMaker;
+import org.apache.druid.sql.calcite.table.DruidTable;
+import org.apache.druid.utils.Throwables;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract base class for handlers that revolve around queries: SELECT,
+ * INSERT and REPLACE. This class handles the common SELECT portion of the statement.
+ */
+public abstract class QueryHandler extends SqlStatementHandler.BaseStatementHandler
+{
+ static final EmittingLogger log = new EmittingLogger(QueryHandler.class);
+
+ protected SqlNode queryNode;
+ protected SqlExplain explain;
+ protected SqlNode validatedQueryNode;
+ private boolean isPrepared;
+ protected RelRoot rootQueryRel;
+ private PrepareResult prepareResult;
+ protected RexBuilder rexBuilder;
+
+ public QueryHandler(SqlStatementHandler.HandlerContext handlerContext, SqlNode sqlNode, SqlExplain explain)
+ {
+ super(handlerContext);
+ this.queryNode = sqlNode;
+ this.explain = explain;
+ }
+
+ @Override
+ public void validate() throws ValidationException
+ {
+ CalcitePlanner planner = handlerContext.planner();
+ validatedQueryNode = planner.validate(rewriteParameters());
+
+ final SqlValidator validator = planner.getValidator();
+ SqlResourceCollectorShuttle resourceCollectorShuttle = new SqlResourceCollectorShuttle(
+ validator,
+ handlerContext.plannerContext()
+ );
+ validatedQueryNode.accept(resourceCollectorShuttle);
+ resourceActions = resourceCollectorShuttle.getResourceActions();
+ }
+
+ private SqlNode rewriteParameters()
+ {
+ // Uses {@link SqlParameterizerShuttle} to rewrite {@link SqlNode} to swap out any
+ // {@link org.apache.calcite.sql.SqlDynamicParam} early for their {@link SqlLiteral}
+ // replacement.
+ //
+ // Parameter replacement is done only if the client provides parameter values.
+ // If this is a PREPARE-only, then there will be no values even if the statement contains
+ // parameters. If this is a PLAN, then we'll catch later the case that the statement
+ // contains parameters, but no values were provided.
+ PlannerContext plannerContext = handlerContext.plannerContext();
+ if (plannerContext.getParameters().isEmpty()) {
+ return queryNode;
+ } else {
+ return queryNode.accept(new SqlParameterizerShuttle(plannerContext));
+ }
+ }
+
+ @Override
+ public void prepare()
+ {
+ if (isPrepared) {
+ return;
+ }
+ isPrepared = true;
+ rootQueryRel = handlerContext.planner().rel(validatedQueryNode);
+ final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+ final SqlValidator validator = handlerContext.planner().getValidator();
+ final RelDataType parameterTypes = validator.getParameterRowType(validatedQueryNode);
+ final RelDataType returnedRowType;
+
+ if (explain != null) {
+ returnedRowType = getExplainStructType(typeFactory);
+ } else {
+ returnedRowType = returnedRowType();
+ }
+
+ prepareResult = new PrepareResult(rootQueryRel.validatedRowType, returnedRowType, parameterTypes);
+ }
+
+ @Override
+ public PrepareResult prepareResult()
+ {
+ return prepareResult;
+ }
+
+ protected abstract RelDataType returnedRowType();
+
+ private static RelDataType getExplainStructType(RelDataTypeFactory typeFactory)
+ {
+ return typeFactory.createStructType(
+ ImmutableList.of(
+ Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR),
+ Calcites.createSqlType(typeFactory, SqlTypeName.VARCHAR)
+ ),
+ ImmutableList.of("PLAN", "RESOURCES")
+ );
+ }
+
+ @Override
+ public PlannerResult plan() throws ValidationException
+ {
+ prepare();
+ final Set<RelOptTable> bindableTables = getBindableTables(rootQueryRel.rel);
+
+ // the planner's type factory is not available until after parsing
+ rexBuilder = new RexBuilder(handlerContext.planner().getTypeFactory());
+
+ try {
+ if (!bindableTables.isEmpty()) {
+ // Consider BINDABLE convention when necessary. Used for metadata tables.
+
+ if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.ALLOW_BINDABLE_PLAN)) {
+ throw new ValidationException(
+ StringUtils.format(
+ "Cannot query table%s %s with SQL engine '%s'.",
+ bindableTables.size() != 1 ? "s" : "",
+ bindableTables.stream()
+ .map(table -> Joiner.on(".").join(table.getQualifiedName()))
+ .collect(Collectors.joining(", ")),
+ handlerContext.engine().name()
+ )
+ );
+ }
+
+ return planWithBindableConvention();
+ } else {
+ // Druid convention is used whenever there are no tables that require BINDABLE.
+ return planForDruid();
+ }
+ }
+ catch (Exception e) {
+ Throwable cannotPlanException = Throwables.getCauseOfType(e, RelOptPlanner.CannotPlanException.class);
+ if (null == cannotPlanException) {
+ // Not a CannotPlanException, rethrow without logging.
+ throw e;
+ }
+
+ Logger logger = log;
+ if (!handlerContext.queryContext().isDebug()) {
+ logger = log.noStackTrace();
+ }
+ String errorMessage = buildSQLPlanningErrorMessage(cannotPlanException);
+ logger.warn(e, errorMessage);
+ throw new UnsupportedSQLQueryException(errorMessage);
+ }
+ }
+
+ private static Set<RelOptTable> getBindableTables(final RelNode relNode)
+ {
+ class HasBindableVisitor extends RelVisitor
+ {
+ private final Set<RelOptTable> found = new HashSet<>();
+
+ @Override
+ public void visit(RelNode node, int ordinal, RelNode parent)
+ {
+ if (node instanceof TableScan) {
+ RelOptTable table = node.getTable();
+ if (table.unwrap(ScannableTable.class) != null && table.unwrap(DruidTable.class) == null) {
+ found.add(table);
+ return;
+ }
+ }
+
+ super.visit(node, ordinal, parent);
+ }
+ }
+
+ final HasBindableVisitor visitor = new HasBindableVisitor();
+ visitor.go(relNode);
+ return visitor.found;
+ }
+
+ /**
+ * Construct a {@link PlannerResult} for a fall-back 'bindable' rel, for
+ * things that are not directly translatable to native Druid queries such
+ * as system tables and just a general purpose (but definitely not optimized)
+ * fall-back.
+ *
+ * See {@link #planWithDruidConvention} which will handle things which are
+ * directly translatable to native Druid queries.
+ *
+ * The bindable path handles parameter substitution of any values not
+ * bound by the earlier steps.
+ */
+ private PlannerResult planWithBindableConvention()
+ {
+ CalcitePlanner planner = handlerContext.planner();
+ BindableRel bindableRel = (BindableRel) planner.transform(
+ CalciteRulesManager.BINDABLE_CONVENTION_RULES,
+ planner.getEmptyTraitSet().replace(BindableConvention.INSTANCE).plus(rootQueryRel.collation),
+ rootQueryRel.rel
+ );
+
+ if (!rootQueryRel.isRefTrivial()) {
+ // Add a projection on top to accommodate root.fields.
+ final List<RexNode> projects = new ArrayList<>();
+ final RexBuilder rexBuilder = bindableRel.getCluster().getRexBuilder();
+ for (int field : Pair.left(rootQueryRel.fields)) {
+ projects.add(rexBuilder.makeInputRef(bindableRel, field));
+ }
+ bindableRel = new Bindables.BindableProject(
+ bindableRel.getCluster(),
+ bindableRel.getTraitSet(),
+ bindableRel,
+ projects,
+ rootQueryRel.validatedRowType
+ );
+ }
+
+ PlannerContext plannerContext = handlerContext.plannerContext();
+ if (explain != null) {
+ return planExplanation(bindableRel, false);
+ } else {
+ final BindableRel theRel = bindableRel;
+ final DataContext dataContext = plannerContext.createDataContext(
+ planner.getTypeFactory(),
+ plannerContext.getParameters()
+ );
+ final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
+ final Enumerable<?> enumerable = theRel.bind(dataContext);
+ final Enumerator<?> enumerator = enumerable.enumerator();
+ return QueryResponse.withEmptyContext(
+ Sequences.withBaggage(new BaseSequence<>(
+ new BaseSequence.IteratorMaker<Object[], QueryHandler.EnumeratorIterator<Object[]>>()
+ {
+ @Override
+ public QueryHandler.EnumeratorIterator<Object[]> make()
+ {
+ return new QueryHandler.EnumeratorIterator<>(new Iterator<Object[]>()
+ {
+ @Override
+ public boolean hasNext()
+ {
+ return enumerator.moveNext();
+ }
+
+ @Override
+ public Object[] next()
+ {
+ return (Object[]) enumerator.current();
+ }
+ });
+ }
+
+ @Override
+ public void cleanup(QueryHandler.EnumeratorIterator<Object[]> iterFromMake)
+ {
+
+ }
+ }
+ ), enumerator::close)
+ );
+ };
+ return new PlannerResult(resultsSupplier, rootQueryRel.validatedRowType);
+ }
+ }
+
+ /**
+ * Construct a {@link PlannerResult} for an 'explain' query from a {@link RelNode}
+ */
+ protected PlannerResult planExplanation(
+ final RelNode rel,
+ final boolean isDruidConventionExplanation
+ )
+ {
+ PlannerContext plannerContext = handlerContext.plannerContext();
+ String explanation = RelOptUtil.dumpPlan("", rel, explain.getFormat(), explain.getDetailLevel());
+ String resourcesString;
+ try {
+ if (isDruidConventionExplanation && rel instanceof DruidRel) {
+ // Show the native queries instead of Calcite's explain if the legacy flag is turned off
+ if (plannerContext.getPlannerConfig().isUseNativeQueryExplain()) {
+ DruidRel<?> druidRel = (DruidRel<?>) rel;
+ try {
+ explanation = explainSqlPlanAsNativeQueries(druidRel);
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Unable to translate to a native Druid query. Resorting to legacy Druid explain plan.");
+ }
+ }
+ }
+ final Set<Resource> resources =
+ plannerContext.getResourceActions().stream().map(ResourceAction::getResource).collect(Collectors.toSet());
+ resourcesString = plannerContext.getJsonMapper().writeValueAsString(resources);
+ }
+ catch (JsonProcessingException jpe) {
+ // this should never happen, we create the Resources here, not a user
+ log.error(jpe, "Encountered exception while serializing resources for explain output");
+ resourcesString = null;
+ }
+ final Supplier<QueryResponse<Object[]>> resultsSupplier = Suppliers.ofInstance(
+ QueryResponse.withEmptyContext(
+ Sequences.simple(ImmutableList.of(new Object[]{explanation, resourcesString}))
+ )
+ );
+ return new PlannerResult(resultsSupplier, getExplainStructType(rel.getCluster().getTypeFactory()));
+ }
+
+ /**
+ * This method doesn't utilize the Calcite's internal {@link RelOptUtil#dumpPlan} since that tends to be verbose
+ * and not indicative of the native Druid Queries which will get executed
+ * This method assumes that the Planner has converted the RelNodes to DruidRels, and thereby we can implicitly cast it
+ *
+ * @param rel Instance of the root {@link DruidRel} which is formed by running the planner transformations on it
+ * @return A string representing an array of native queries that correspond to the given SQL query, in JSON format
+ * @throws JsonProcessingException
+ */
+ private String explainSqlPlanAsNativeQueries(DruidRel<?> rel) throws JsonProcessingException
+ {
+ ObjectMapper jsonMapper = handlerContext.jsonMapper();
+ List<DruidQuery> druidQueryList;
+ druidQueryList = flattenOutermostRel(rel)
+ .stream()
+ .map(druidRel -> druidRel.toDruidQuery(false))
+ .collect(Collectors.toList());
+
+
+ // Putting the queries as object node in an ArrayNode, since directly returning a list causes issues when
+ // serializing the "queryType". Another method would be to create a POJO containing query and signature, and then
+ // serializing it using normal list method.
+ ArrayNode nativeQueriesArrayNode = jsonMapper.createArrayNode();
+
+ for (DruidQuery druidQuery : druidQueryList) {
+ Query<?> nativeQuery = druidQuery.getQuery();
+ ObjectNode objectNode = jsonMapper.createObjectNode();
+ objectNode.put("query", jsonMapper.convertValue(nativeQuery, ObjectNode.class));
+ objectNode.put("signature", jsonMapper.convertValue(druidQuery.getOutputRowSignature(), ArrayNode.class));
+ nativeQueriesArrayNode.add(objectNode);
+ }
+
+ return jsonMapper.writeValueAsString(nativeQueriesArrayNode);
+ }
+
+ /**
+ * Given a {@link DruidRel}, this method recursively flattens the Rels if they are of the type {@link DruidUnionRel}
+ * It is implicitly assumed that the {@link DruidUnionRel} can never be the child of a non {@link DruidUnionRel}
+ * node
+ * E.g. a DruidRel structure of kind:<pre><code>
+ * DruidUnionRel
+ * DruidUnionRel
+ * DruidRel (A)
+ * DruidRel (B)
+ * DruidRel(C)
+ * </code</pre>will return {@code [DruidRel(A), DruidRel(B), DruidRel(C)]}.
+ *
+ * @param outermostDruidRel The outermost rel which is to be flattened
+ * @return a list of DruidRel's which do not have a DruidUnionRel nested in between them
+ */
+ private List<DruidRel<?>> flattenOutermostRel(DruidRel<?> outermostDruidRel)
+ {
+ List<DruidRel<?>> druidRels = new ArrayList<>();
+ flattenOutermostRel(outermostDruidRel, druidRels);
+ return druidRels;
+ }
+
+ /**
+ * Recursive function (DFS) which traverses the nodes and collects the corresponding {@link DruidRel} into a list if
+ * they are not of the type {@link DruidUnionRel} or else calls the method with the child nodes. The DFS order of the
+ * nodes are retained, since that is the order in which they will actually be called in {@link DruidUnionRel#runQuery()}
+ *
+ * @param druidRel The current relNode
+ * @param flattendListAccumulator Accumulator list which needs to be appended by this method
+ */
+ private void flattenOutermostRel(DruidRel<?> druidRel, List<DruidRel<?>> flattendListAccumulator)
+ {
+ if (druidRel instanceof DruidUnionRel) {
+ DruidUnionRel druidUnionRel = (DruidUnionRel) druidRel;
+ druidUnionRel.getInputs().forEach(innerRelNode -> {
+ DruidRel<?> innerDruidRelNode = (DruidRel<?>) innerRelNode; // This type conversion should always be possible
+ flattenOutermostRel(innerDruidRelNode, flattendListAccumulator);
+ });
+ } else {
+ flattendListAccumulator.add(druidRel);
+ }
+ }
+
+ protected abstract PlannerResult planForDruid() throws ValidationException;
+
+ /**
+ * Construct a {@link PlannerResult} for a {@link RelNode} that is directly translatable to a native Druid query.
+ */
+ protected PlannerResult planWithDruidConvention() throws ValidationException
+ {
+ final RelRoot possiblyLimitedRoot = possiblyWrapRootWithOuterLimitFromContext(rootQueryRel);
+ final QueryMaker queryMaker = buildQueryMaker(possiblyLimitedRoot);
+ PlannerContext plannerContext = handlerContext.plannerContext();
+ plannerContext.setQueryMaker(queryMaker);
+
+ // Fall-back dynamic parameter substitution using {@link RelParameterizerShuttle}
+ // in the event that {@link #rewriteDynamicParameters(SqlNode)} was unable to
+ // successfully substitute all parameter values, and will cause a failure if any
+ // dynamic a parameters are not bound. This occurs at least for DATE parameters
+ // with integer values.
+ //
+ // This check also catches the case where we did not do a parameter check earlier
+ // because no values were provided. (Values are not required in the PREPARE case
+ // but now that we're planning, we require them.)
+ RelNode parameterized = possiblyLimitedRoot.rel.accept(
+ new RelParameterizerShuttle(plannerContext)
+ );
+ CalcitePlanner planner = handlerContext.planner();
+ final DruidRel<?> druidRel = (DruidRel<?>) planner.transform(
+ CalciteRulesManager.DRUID_CONVENTION_RULES,
+ planner.getEmptyTraitSet()
+ .replace(DruidConvention.instance())
+ .plus(rootQueryRel.collation),
+ parameterized
+ );
+
+ if (explain != null) {
+ return planExplanation(druidRel, true);
+ } else {
+ // Compute row type.
+ final RelDataType rowType = prepareResult.getReturnedRowType();
+
+ // Start the query.
+ final Supplier<QueryResponse<Object[]>> resultsSupplier = () -> {
+ // sanity check
+ final Set<ResourceAction> readResourceActions =
+ plannerContext.getResourceActions()
+ .stream()
+ .filter(action -> action.getAction() == Action.READ)
+ .collect(Collectors.toSet());
+ Preconditions.checkState(
+ readResourceActions.isEmpty() == druidRel.getDataSourceNames().isEmpty()
+ // The resources found in the plannerContext can be less than the datasources in
+ // the query plan, because the query planner can eliminate empty tables by replacing
+ // them with InlineDataSource of empty rows.
+ || readResourceActions.size() >= druidRel.getDataSourceNames().size(),
+ "Authorization sanity check failed"
+ );
+
+ return druidRel.runQuery();
+ };
+
+ return new PlannerResult(resultsSupplier, rowType);
+ }
+ }
+
+ /**
+ * This method wraps the root with a {@link LogicalSort} that applies a limit (no ordering change). If the outer rel
+ * is already a {@link Sort}, we can merge our outerLimit into it, similar to what is going on in
+ * {@link org.apache.druid.sql.calcite.rule.SortCollapseRule}.
+ *
+ * The {@link PlannerContext#CTX_SQL_OUTER_LIMIT} flag that controls this wrapping is meant for internal use only by
+ * the web console, allowing it to apply a limit to queries without rewriting the original SQL.
+ *
+ * @param root root node
+ *
+ * @return root node wrapped with a limiting logical sort if a limit is specified in the query context.
+ */
+ @Nullable
+ private RelRoot possiblyWrapRootWithOuterLimitFromContext(RelRoot root)
+ {
+ Object outerLimitObj = handlerContext.queryContext().get(PlannerContext.CTX_SQL_OUTER_LIMIT);
+ Long outerLimit = DimensionHandlerUtils.convertObjectToLong(outerLimitObj, true);
+ if (outerLimit == null) {
+ return root;
+ }
+
+ final LogicalSort newRootRel;
+
+ if (root.rel instanceof Sort) {
+ Sort sort = (Sort) root.rel;
+
+ final OffsetLimit originalOffsetLimit = OffsetLimit.fromSort(sort);
+ final OffsetLimit newOffsetLimit = originalOffsetLimit.andThen(new OffsetLimit(0, outerLimit));
+
+ if (newOffsetLimit.equals(originalOffsetLimit)) {
+ // nothing to do, don't bother to make a new sort
+ return root;
+ }
+
+ newRootRel = LogicalSort.create(
+ sort.getInput(),
+ sort.collation,
+ newOffsetLimit.getOffsetAsRexNode(rexBuilder),
+ newOffsetLimit.getLimitAsRexNode(rexBuilder)
+ );
+ } else {
+ newRootRel = LogicalSort.create(
+ root.rel,
+ root.collation,
+ null,
+ new OffsetLimit(0, outerLimit).getLimitAsRexNode(rexBuilder)
+ );
+ }
+
+ return new RelRoot(newRootRel, root.validatedRowType, root.kind, root.fields, root.collation);
+ }
+
+ protected abstract QueryMaker buildQueryMaker(RelRoot rootQueryRel) throws ValidationException;
+
+ private String buildSQLPlanningErrorMessage(Throwable exception)
+ {
+ String errorMessage = handlerContext.plannerContext().getPlanningError();
+ if (null == errorMessage && exception instanceof UnsupportedSQLQueryException) {
+ errorMessage = exception.getMessage();
+ }
+ if (null == errorMessage) {
+ errorMessage = "Please check Broker logs for additional details.";
+ } else {
+ // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong.
+ errorMessage = "Possible error: " + errorMessage;
+ }
+ // Finally, add the query itself to error message that user will get.
+ return StringUtils.format(
+ "Query not supported. %s SQL was: %s", errorMessage,
+ handlerContext.plannerContext().getSql()
+ );
+ }
+
+ public static class SelectHandler extends QueryHandler
+ {
+ private final SqlNode sqlNode;
+
+ public SelectHandler(
+ HandlerContext handlerContext,
+ SqlNode sqlNode,
+ SqlExplain explain)
+ {
+ super(handlerContext, sqlNode, explain);
+ this.sqlNode = sqlNode;
+ }
+
+ @Override
+ public SqlNode sqlNode()
+ {
+ return sqlNode;
+ }
+
+ @Override
+ public void validate() throws ValidationException
+ {
+ if (!handlerContext.plannerContext().engineHasFeature(EngineFeature.CAN_SELECT)) {
+ throw new ValidationException(StringUtils.format(
+ "Cannot execute SELECT with SQL engine '%s'.",
+ handlerContext.engine().name())
+ );
+ }
+ super.validate();
+ }
+
+ @Override
+ protected RelDataType returnedRowType()
+ {
+ final RelDataTypeFactory typeFactory = rootQueryRel.rel.getCluster().getTypeFactory();
+ return handlerContext.engine().resultTypeForSelect(
+ typeFactory,
+ rootQueryRel.validatedRowType
+ );
+ }
+
+ @Override
+ protected PlannerResult planForDruid() throws ValidationException
+ {
+ return planWithDruidConvention();
+ }
+
+ @Override
+ protected QueryMaker buildQueryMaker(final RelRoot rootQueryRel) throws ValidationException
+ {
+ return handlerContext.engine().buildQueryMakerForSelect(
+ rootQueryRel,
+ handlerContext.plannerContext());
+ }
+ }
+
+ private static class EnumeratorIterator<T> implements Iterator<T>
+ {
+ private final Iterator<T> it;
+
+ EnumeratorIterator(Iterator<T> it)
+ {
+ this.it = it;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return it.hasNext();
+ }
+
+ @Override
+ public T next()
+ {
+ return it.next();
+ }
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
new file mode 100644
index 0000000000..fa8c4fdb17
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/SqlStatementHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.planner;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.joda.time.DateTimeZone;
+
+import java.util.Set;
+
+/**
+ * Handler for a SQL statement. Follows the same lifecycle as the planner,
+ * however this class handles one specific kind of SQL statement.
+ */
+public interface SqlStatementHandler
+{
+ SqlNode sqlNode();
+ void validate() throws ValidationException;
+ Set<ResourceAction> resourceActions();
+ void prepare();
+ PrepareResult prepareResult();
+ PlannerResult plan() throws ValidationException;
+
+ /**
+ * Context available to statement handlers.
+ */
+ interface HandlerContext
+ {
+ PlannerContext plannerContext();
+ SqlEngine engine();
+ CalcitePlanner planner();
+ QueryContext queryContext();
+ SchemaPlus defaultSchema();
+ ObjectMapper jsonMapper();
+ DateTimeZone timeZone();
+ }
+
+ abstract class BaseStatementHandler implements SqlStatementHandler
+ {
+ protected final HandlerContext handlerContext;
+ protected Set<ResourceAction> resourceActions;
+
+ protected BaseStatementHandler(HandlerContext handlerContext)
+ {
+ this.handlerContext = handlerContext;
+ }
+
+ @Override
+ public Set<ResourceAction> resourceActions()
+ {
+ return resourceActions;
+ }
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index 185d96b945..fcf9fb754d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -133,7 +133,7 @@ public class DruidQuery
@Nullable
private final Sorting sorting;
- private final Query query;
+ private final Query<?> query;
private final RowSignature outputRowSignature;
private final RelDataType outputRowType;
private final VirtualColumnRegistry virtualColumnRegistry;
@@ -795,7 +795,7 @@ public class DruidQuery
return outputRowSignature;
}
- public Query getQuery()
+ public Query<?> getQuery()
{
return query;
}
@@ -806,7 +806,7 @@ public class DruidQuery
*
* @return Druid query
*/
- private Query computeQuery()
+ private Query<?> computeQuery()
{
if (dataSource instanceof QueryDataSource) {
// If there is a subquery, then we prefer the outer query to be a groupBy if possible, since this potentially
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
index 9043577a7d..7bf305d42b 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRel.java
@@ -28,7 +28,7 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import javax.annotation.Nullable;
import java.util.Set;
-public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
+public abstract class DruidRel<T extends DruidRel<?>> extends AbstractRelNode
{
private final PlannerContext plannerContext;
@@ -45,7 +45,7 @@ public abstract class DruidRel<T extends DruidRel> extends AbstractRelNode
@Nullable
public abstract PartialDruidQuery getPartialDruidQuery();
- public QueryResponse runQuery()
+ public QueryResponse<Object[]> runQuery()
{
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query, and it will actually get run as a native query. Druid's native query layer will
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
index de1bc8b758..f754fc0cf0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionRel.java
@@ -107,11 +107,11 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
- public QueryResponse runQuery()
+ public QueryResponse<Object[]> runQuery()
{
// Lazy: run each query in sequence, not all at once.
if (limit == 0) {
- return new QueryResponse(Sequences.empty(), ResponseContext.createEmpty());
+ return new QueryResponse<Object[]>(Sequences.empty(), ResponseContext.createEmpty());
} else {
// We run the first rel here for two reasons:
@@ -122,10 +122,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
// is also sub-optimal as it would consume parallel query resources and potentially starve the system.
// Instead, we only return the headers from the first query and potentially exception out and fail the query
// if there are any response headers that come from subsequent queries that are correctness concerns
- final QueryResponse queryResponse = ((DruidRel) rels.get(0)).runQuery();
+ final QueryResponse<Object[]> queryResponse = ((DruidRel) rels.get(0)).runQuery();
- final List<Sequence<Object>> firstAsList = Collections.singletonList(queryResponse.getResults());
- final Iterable<Sequence<Object>> theRestTransformed = FluentIterable
+ final List<Sequence<Object[]>> firstAsList = Collections.singletonList(queryResponse.getResults());
+ final Iterable<Sequence<Object[]>> theRestTransformed = FluentIterable
.from(rels.subList(1, rels.size()))
.transform(
rel -> {
@@ -144,10 +144,10 @@ public class DruidUnionRel extends DruidRel<DruidUnionRel>
}
);
- final Iterable<Sequence<Object>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
+ final Iterable<Sequence<Object[]>> recombinedSequences = Iterables.concat(firstAsList, theRestTransformed);
final Sequence returnSequence = Sequences.concat(recombinedSequences);
- return new QueryResponse(
+ return new QueryResponse<Object[]>(
limit > 0 ? returnSequence.limit(limit) : returnSequence,
queryResponse.getResponseContext()
);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
index f045769ec9..27a5462ba7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeQueryMaker.java
@@ -94,7 +94,7 @@ public class NativeQueryMaker implements QueryMaker
}
@Override
- public QueryResponse runQuery(final DruidQuery druidQuery)
+ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
final Query<?> query = druidQuery.getQuery();
@@ -173,7 +173,8 @@ public class NativeQueryMaker implements QueryMaker
.orElseGet(query::getIntervals);
}
- private <T> QueryResponse execute(Query<T> query, final List<String> newFields, final List<SqlTypeName> newTypes)
+ @SuppressWarnings("unchecked")
+ private <T> QueryResponse<Object[]> execute(Query<?> query, final List<String> newFields, final List<SqlTypeName> newTypes)
{
Hook.QUERY_PLAN.run(query);
@@ -195,14 +196,19 @@ public class NativeQueryMaker implements QueryMaker
// otherwise it won't yet be initialized. (A bummer, since ideally, we'd verify the toolChest exists and can do
// array-based results before starting the query; but in practice we don't expect this to happen since we keep
// tight control over which query types we generate in the SQL layer. They all support array-based results.)
- final QueryResponse results = queryLifecycle.runSimple(query, authenticationResult, authorizationResult);
-
-
- return mapResultSequence(results, queryLifecycle.getToolChest(), query, newFields, newTypes);
+ final QueryResponse<T> results = queryLifecycle.runSimple((Query<T>) query, authenticationResult, authorizationResult);
+
+ return mapResultSequence(
+ results,
+ (QueryToolChest<T, Query<T>>) queryLifecycle.getToolChest(),
+ (Query<T>) query,
+ newFields,
+ newTypes
+ );
}
- private <T> QueryResponse mapResultSequence(
- final QueryResponse results,
+ private <T> QueryResponse<Object[]> mapResultSequence(
+ final QueryResponse<T> results,
final QueryToolChest<T, Query<T>> toolChest,
final Query<T> query,
final List<String> newFields,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
index 8acc02230c..8039d60c3e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/QueryMaker.java
@@ -33,5 +33,5 @@ public interface QueryMaker
* created for. The returned arrays match the row type given by {@link SqlEngine#resultTypeForSelect} or
* {@link SqlEngine#resultTypeForInsert}, depending on the nature of the statement.
*/
- QueryResponse runQuery(DruidQuery druidQuery);
+ QueryResponse<Object[]> runQuery(DruidQuery druidQuery);
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index db7f7d4ae7..dc4bfbcfb6 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -128,7 +128,7 @@ public class SqlResource
try {
Thread.currentThread().setName(StringUtils.format("sql[%s]", sqlQueryId));
ResultSet resultSet = stmt.plan();
- final QueryResponse response = resultSet.run();
+ final QueryResponse<Object[]> response = resultSet.run();
final SqlRowTransformer rowTransformer = resultSet.createRowTransformer();
final Yielder<Object[]> finalYielder = Yielders.each(response.getResults());
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
index 92d0a5773f..46d3e7fccd 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java
@@ -43,7 +43,7 @@ import org.apache.druid.sql.calcite.external.ExternalDataSource;
import org.apache.druid.sql.calcite.external.ExternalOperatorConversion;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.apache.druid.sql.calcite.planner.DruidPlanner;
+import org.apache.druid.sql.calcite.planner.IngestHandler;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
@@ -204,7 +204,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("INSERT INTO dst (foo, bar) SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "INSERT with target column list is not supported.")
+ .expectValidationError(SqlPlanningException.class, "INSERT with a target column list is not supported.")
.verify();
}
@@ -226,7 +226,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO dst SELECT * FROM INFORMATION_SCHEMA.COLUMNS PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot query table [INFORMATION_SCHEMA.COLUMNS] with SQL engine 'ingestion-test'."
+ "Cannot query table INFORMATION_SCHEMA.COLUMNS with SQL engine 'ingestion-test'."
)
.verify();
}
@@ -238,7 +238,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO INFORMATION_SCHEMA.COLUMNS SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+ "Cannot INSERT into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
)
.verify();
}
@@ -250,7 +250,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO view.aview SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [view.aview] because it is not a Druid datasource (schema = druid)."
+ "Cannot INSERT into view.aview because it is not a Druid datasource."
)
.verify();
}
@@ -280,7 +280,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO nonexistent.dst SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot INSERT into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+ "Cannot INSERT into nonexistent.dst because it is not a Druid datasource."
)
.verify();
}
@@ -435,7 +435,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
)
.expectValidationError(
SqlPlanningException.class,
- "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause"
+ "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause"
)
.verify();
}
@@ -517,7 +517,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
}
catch (SqlPlanningException e) {
Assert.assertEquals(
- "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+ "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
e.getMessage()
);
}
@@ -561,7 +561,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
}
catch (SqlPlanningException e) {
Assert.assertEquals(
- "Cannot have ORDER BY on an INSERT query, use CLUSTERED BY instead.",
+ "Cannot have ORDER BY on an INSERT statement, use CLUSTERED BY instead.",
e.getMessage()
);
}
@@ -796,7 +796,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO t SELECT dim1, dim2 || '-lol' FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
- DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+ IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}
@@ -808,7 +808,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
.sql("INSERT INTO t SELECT __time, dim1 AS EXPR$0 FROM foo PARTITIONED BY ALL")
.expectValidationError(
SqlPlanningException.class,
- DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+ IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}
@@ -822,7 +822,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
+ "(SELECT __time, LOWER(dim1) FROM foo) PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- DruidPlanner.UNNAMED_INGESTION_COLUMN_ERROR
+ IngestHandler.UNNAMED_INGESTION_COLUMN_ERROR
)
.verify();
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
index a244abef6a..eb31a17fc9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteReplaceDmlTest.java
@@ -255,7 +255,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT * FROM foo ORDER BY dim1 PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE query, use CLUSTERED BY instead.")
+ .expectValidationError(SqlPlanningException.class, "Cannot have ORDER BY on a REPLACE statement, use CLUSTERED BY instead.")
.verify();
}
@@ -390,7 +390,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst (foo, bar) OVERWRITE ALL SELECT dim1, dim2 FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "REPLACE with target column list is not supported.")
+ .expectValidationError(SqlPlanningException.class, "REPLACE with a target column list is not supported.")
.verify();
}
@@ -408,7 +408,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE ALL SELECT __time, FLOOR(m1) as floor_m1, dim1 FROM foo CLUSTERED BY dim1")
- .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In druid, the CLUSTERED BY clause has to be specified after the PARTITIONED BY clause")
+ .expectValidationError(SqlPlanningException.class, "CLUSTERED BY found before PARTITIONED BY. In Druid, the CLUSTERED BY clause must follow the PARTITIONED BY clause")
.verify();
}
@@ -417,7 +417,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst SELECT * FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+ .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
.verify();
}
@@ -426,7 +426,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
{
testIngestionQuery()
.sql("REPLACE INTO dst OVERWRITE SELECT * FROM foo PARTITIONED BY ALL TIME")
- .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE, set it to OVERWRITE WHERE <__time based condition> or set it to overwrite the entire table with OVERWRITE ALL.")
+ .expectValidationError(SqlPlanningException.class, "Missing time chunk information in OVERWRITE clause for REPLACE. Use OVERWRITE WHERE <__time based condition> or OVERWRITE ALL to overwrite the entire table.")
.verify();
}
@@ -437,7 +437,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO INFORMATION_SCHEMA.COLUMNS OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot REPLACE into [INFORMATION_SCHEMA.COLUMNS] because it is not a Druid datasource (schema = druid)."
+ "Cannot REPLACE into INFORMATION_SCHEMA.COLUMNS because it is not a Druid datasource."
)
.verify();
}
@@ -449,7 +449,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO view.aview OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot REPLACE into [view.aview] because it is not a Druid datasource (schema = druid)."
+ "Cannot REPLACE into view.aview because it is not a Druid datasource."
)
.verify();
}
@@ -479,7 +479,7 @@ public class CalciteReplaceDmlTest extends CalciteIngestionDmlTest
.sql("REPLACE INTO nonexistent.dst OVERWRITE ALL SELECT * FROM foo PARTITIONED BY ALL TIME")
.expectValidationError(
SqlPlanningException.class,
- "Cannot REPLACE into [nonexistent.dst] because it is not a Druid datasource (schema = druid)."
+ "Cannot REPLACE into nonexistent.dst because it is not a Druid datasource."
)
.verify();
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
index 22c004947a..8562ce29be 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/TestInsertQueryMaker.java
@@ -45,7 +45,7 @@ public class TestInsertQueryMaker implements QueryMaker
}
@Override
- public QueryResponse runQuery(final DruidQuery druidQuery)
+ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
{
// Don't actually execute anything, but do record information that tests will check for.
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index d1bb098b84..88d754e0fa 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -2077,7 +2077,7 @@ public class SqlResourceTest extends CalciteTestBase
return new ResultSet(plannerResult)
{
@Override
- public QueryResponse run()
+ public QueryResponse<Object[]> run()
{
final Function<Sequence<Object[]>, Sequence<Object[]>> sequenceMapFn =
Optional.ofNullable(sequenceMapFnSupplier.get()).orElse(Function.identity());
@@ -2085,12 +2085,12 @@ public class SqlResourceTest extends CalciteTestBase
final NonnullPair<CountDownLatch, Boolean> executeLatch = executeLatchSupplier.get();
if (executeLatch != null) {
if (executeLatch.rhs) {
- final QueryResponse resp = super.run();
+ final QueryResponse<Object[]> resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
executeLatch.lhs.countDown();
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
- return new QueryResponse(sequence, respContext);
+ return new QueryResponse<>(sequence, respContext);
} else {
try {
if (!executeLatch.lhs.await(WAIT_TIMEOUT_SECS, TimeUnit.SECONDS)) {
@@ -2103,11 +2103,11 @@ public class SqlResourceTest extends CalciteTestBase
}
}
- final QueryResponse resp = super.run();
+ final QueryResponse<Object[]> resp = super.run();
Sequence<Object[]> sequence = sequenceMapFn.apply(resp.getResults());
final ResponseContext respContext = resp.getResponseContext();
respContext.merge(responseContextSupplier.get());
- return new QueryResponse(sequence, respContext);
+ return new QueryResponse<>(sequence, respContext);
}
};
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org