You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/08/27 23:15:39 UTC

[GitHub] [druid] clintropolis commented on a change in pull request #10324: SQL support for union datasources.

clintropolis commented on a change in pull request #10324:
URL: https://github.com/apache/druid/pull/10324#discussion_r478740350



##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidUnionDataSourceRule.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rule;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.util.mapping.Mappings;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.rel.DruidQueryRel;
+import org.apache.druid.sql.calcite.rel.DruidRel;
+import org.apache.druid.sql.calcite.rel.DruidRels;
+import org.apache.druid.sql.calcite.rel.DruidUnionDataSourceRel;
+import org.apache.druid.sql.calcite.rel.PartialDruidQuery;
+import org.apache.druid.sql.calcite.table.DruidTable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Creates a {@link DruidUnionDataSourceRel} from various {@link DruidQueryRel} inputs that represent simple
+ * table scans.
+ */
+public class DruidUnionDataSourceRule extends RelOptRule
+{
+  private static final DruidUnionDataSourceRule INSTANCE = new DruidUnionDataSourceRule();
+
+  private DruidUnionDataSourceRule()
+  {
+    super(
+        operand(
+            Union.class,
+            operand(DruidRel.class, none()),
+            operand(DruidQueryRel.class, none())
+        )
+    );
+  }
+
+  public static DruidUnionDataSourceRule instance()
+  {
+    return INSTANCE;
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call)
+  {
+    final Union unionRel = call.rel(0);
+    final DruidRel<?> firstDruidRel = call.rel(1);
+    final DruidQueryRel secondDruidRel = call.rel(2);
+
+    // Can only do UNION ALL of inputs that have compatible schemas (or schema mappings).
+    return unionRel.all && isUnionCompatible(firstDruidRel, secondDruidRel);
+  }
+
+  @Override
+  public void onMatch(final RelOptRuleCall call)
+  {
+    final Union unionRel = call.rel(0);
+    final DruidRel<?> firstDruidRel = call.rel(1);
+    final DruidQueryRel secondDruidRel = call.rel(2);
+
+    if (firstDruidRel instanceof DruidUnionDataSourceRel) {
+      // Unwrap and flatten the inputs to the Union.
+      final RelNode newUnionRel = call.builder()
+                                      .pushAll(firstDruidRel.getInputs())
+                                      .push(secondDruidRel)
+                                      .union(true, firstDruidRel.getInputs().size() + 1)
+                                      .build();
+
+      call.transformTo(
+          DruidUnionDataSourceRel.create(
+              (Union) newUnionRel,
+              getColumnNamesIfTableOrUnion(firstDruidRel).get(),
+              firstDruidRel.getQueryMaker()
+          )
+      );
+    } else {
+      // Sanity check.
+      if (!(firstDruidRel instanceof DruidQueryRel)) {
+        throw new ISE("Expected first rel to be a DruidQueryRel, but it was %s", firstDruidRel.getClass().getName());
+      }
+
+      call.transformTo(
+          DruidUnionDataSourceRel.create(
+              unionRel,
+              getColumnNamesIfTableOrUnion(firstDruidRel).get(),
+              firstDruidRel.getQueryMaker()
+          )
+      );
+    }
+  }
+
+  private static boolean isUnionCompatible(final DruidRel<?> first, final DruidRel<?> second)
+  {
+    final Optional<List<String>> columnNames = getColumnNamesIfTableOrUnion(first);
+    return columnNames.isPresent() && columnNames.equals(getColumnNamesIfTableOrUnion(second));
+  }
+
+  static Optional<List<String>> getColumnNamesIfTableOrUnion(final DruidRel<?> druidRel)
+  {
+    final PartialDruidQuery partialQuery = druidRel.getPartialDruidQuery();
+
+    final Optional<DruidTable> druidTable =
+        DruidRels.druidTableIfLeafRel(druidRel)
+                 .filter(table -> table.getDataSource() instanceof TableDataSource);
+
+    if (druidTable.isPresent() && DruidRels.isScanOrMapping(druidRel, false)) {
+      // This rel is a table scan or mapping.
+
+      if (partialQuery.stage() == PartialDruidQuery.Stage.SCAN) {
+        return Optional.of(druidTable.get().getRowSignature().getColumnNames());
+      } else {
+        // Sanity check. Expected to be true due to the "scan or mapping" check.
+        if (partialQuery.stage() != PartialDruidQuery.Stage.SELECT_PROJECT) {
+          throw new ISE("Expected stage %s but got %s", PartialDruidQuery.Stage.SELECT_PROJECT, partialQuery.stage());
+        }
+
+        // Apply the mapping (with additional sanity checks).
+        final RowSignature tableSignature = druidTable.get().getRowSignature();
+        final Mappings.TargetMapping mapping = partialQuery.getSelectProject().getMapping();
+
+        if (mapping.getSourceCount() != tableSignature.size()) {
+          throw new ISE(
+              "Expected mapping with %d columns but got %d columns",

Review comment:
       nit: would the signatures themselves rather than the count be more useful?

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidRels.java
##########
@@ -59,12 +59,14 @@ public static boolean isScanOrMapping(final DruidRel<?> druidRel, final boolean
   /**
    * Check if a druidRel is a simple table scan or a scan + projection.
    *
-   * @param druidRel  the rel to check
-   * @param canBeJoin consider a 'join' that doesn't do anything fancy to be a scan-or-mapping too.
+   * @param druidRel         the rel to check
+   * @param canBeJoinOrUnion consider a {@link DruidJoinQueryRel} or {@link DruidUnionDataSourceRel} as possible
+   *                         scans-and-mappings too.
    */
-  private static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoin)
+  private static boolean isScanOrProject(final DruidRel<?> druidRel, final boolean canBeJoinOrUnion)
   {
-    if (druidRel instanceof DruidQueryRel || (canBeJoin && druidRel instanceof DruidJoinQueryRel)) {
+    if (druidRel instanceof DruidQueryRel || (canBeJoinOrUnion && (druidRel instanceof DruidJoinQueryRel

Review comment:
       nit: I wonder if the `instanceof` checks here are something that should be pushed into a couple of methods on `DruidRel` (one to check if is scan/projection one to check if is 'external' (or is there a better word?) scan/projection for join/union)

##########
File path: sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidUnionDataSourceRel.java
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.rel;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnionDataSource;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.table.RowSignatures;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Represents a query on top of a {@link UnionDataSource}. This is used to represent a "UNION ALL" of regular table
+ * datasources.
+ *
+ * See {@link DruidUnionRel} for a version that can union any set of queries together (not just regular tables),
+ * but also must be the outermost rel of a query plan. In the future we expect that {@link UnionDataSource} will gain
+ * the ability to union query datasources together, and then this class could replace {@link DruidUnionRel}.
+ */
+public class DruidUnionDataSourceRel extends DruidRel<DruidUnionDataSourceRel>
+{
+  private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__union__");
+
+  private final Union unionRel;
+  private final List<String> unionColumnNames;
+  private final PartialDruidQuery partialQuery;
+
+  private DruidUnionDataSourceRel(
+      final RelOptCluster cluster,
+      final RelTraitSet traitSet,
+      final Union unionRel,
+      final List<String> unionColumnNames,
+      final PartialDruidQuery partialQuery,
+      final QueryMaker queryMaker
+  )
+  {
+    super(cluster, traitSet, queryMaker);
+    this.unionRel = unionRel;
+    this.unionColumnNames = unionColumnNames;
+    this.partialQuery = partialQuery;
+  }
+
+  public static DruidUnionDataSourceRel create(
+      final Union unionRel,
+      final List<String> unionColumnNames,
+      final QueryMaker queryMaker
+  )
+  {
+    return new DruidUnionDataSourceRel(
+        unionRel.getCluster(),
+        unionRel.getTraitSet(),
+        unionRel,
+        unionColumnNames,
+        PartialDruidQuery.create(unionRel),
+        queryMaker
+    );
+  }
+
+  public List<String> getUnionColumnNames()
+  {
+    return unionColumnNames;
+  }
+
+  @Override
+  public PartialDruidQuery getPartialDruidQuery()
+  {
+    return partialQuery;
+  }
+
+  @Override
+  public DruidUnionDataSourceRel withPartialQuery(final PartialDruidQuery newQueryBuilder)
+  {
+    return new DruidUnionDataSourceRel(
+        getCluster(),
+        getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
+        unionRel,
+        unionColumnNames,
+        newQueryBuilder,
+        getQueryMaker()
+    );
+  }
+
+  @Override
+  public Sequence<Object[]> runQuery()
+  {
+    // runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
+    // is the outermost query and it will actually get run as a native query. Druid's native query layer will
+    // finalize aggregations for the outermost query even if we don't explicitly ask it to.
+
+    return getQueryMaker().runQuery(toDruidQuery(false));
+  }
+
+  @Override
+  public DruidQuery toDruidQuery(final boolean finalizeAggregations)
+  {
+    final List<TableDataSource> dataSources = new ArrayList<>();
+    RowSignature signature = null;
+
+    for (final RelNode relNode : unionRel.getInputs()) {
+      final DruidRel<?> druidRel = (DruidRel<?>) relNode;
+      if (!DruidRels.isScanOrMapping(druidRel, false)) {
+        throw new CannotBuildQueryException(druidRel);
+      }
+
+      final DruidQuery query = druidRel.toDruidQuery(false);
+      final DataSource dataSource = query.getDataSource();
+      if (!(dataSource instanceof TableDataSource)) {
+        throw new CannotBuildQueryException(druidRel);
+      }
+
+      if (signature == null) {
+        signature = query.getOutputRowSignature();
+      }
+
+      if (signature.getColumnNames().equals(query.getOutputRowSignature().getColumnNames())) {

Review comment:
       nit: this could be an `else if` of the previous `if` that assigns `signature` if it was previously null




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org