You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/03/08 23:01:37 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #13902: Window planning: use collation traits, improve subquery logic.

imply-cheddar commented on code in PR #13902:
URL: https://github.com/apache/druid/pull/13902#discussion_r1130158405


##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -109,53 +113,67 @@
   public static Windowing fromCalciteStuff(
       final PartialDruidQuery partialQuery,
       final PlannerContext plannerContext,
-      final RowSignature rowSignature,
+      final RowSignature sourceRowSignature,
       final RexBuilder rexBuilder
   )
   {
     final Window window = Preconditions.checkNotNull(partialQuery.getWindow(), "window");
 
     ArrayList<OperatorFactory> ops = new ArrayList<>();
 
-    final List<String> expectedOutputColumns = new ArrayList<>(rowSignature.getColumnNames());
-    final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", rowSignature.getColumnNames());
+    final List<String> windowOutputColumns = new ArrayList<>(sourceRowSignature.getColumnNames());
+    final String outputNamePrefix = Calcites.findUnusedPrefixForDigits("w", sourceRowSignature.getColumnNames());
     int outputNameCounter = 0;
+
+    // Track prior partition columns and sort columns group-to-group, so we only insert sorts and repartitions if
+    // we really need to.
+    List<String> priorPartitionColumns = null;
+    LinkedHashSet<ColumnWithDirection> priorSortColumns = new LinkedHashSet<>();
+
+    final RelCollation priorCollation = partialQuery.getScan().getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+    if (priorCollation != null) {
+      // Populate initial priorSortColumns using collation of the input to the window operation. Allows us to skip
+      // the initial sort operator if the rows were already in the desired order.
+      priorSortColumns = computeSortColumnsFromRelCollation(priorCollation, sourceRowSignature);
+    }
+
     for (int i = 0; i < window.groups.size(); ++i) {
-      final WindowGroup group = new WindowGroup(window, window.groups.get(i), rowSignature);
+      final WindowGroup group = new WindowGroup(window, window.groups.get(i), sourceRowSignature);
 
-      if (i > 0) {
-        LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
-        for (String partitionColumn : group.getPartitionColumns()) {
-          sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
-        }
-        sortColumns.addAll(group.getOrdering());
+      final LinkedHashSet<ColumnWithDirection> sortColumns = new LinkedHashSet<>();
+      for (String partitionColumn : group.getPartitionColumns()) {
+        sortColumns.add(ColumnWithDirection.ascending(partitionColumn));
+      }
+      sortColumns.addAll(group.getOrdering());
 
+      // Add sorting and partitioning if needed.
+      if (!sortMatches(ImmutableList.copyOf(priorSortColumns), ImmutableList.copyOf(sortColumns))) {

Review Comment:
   Why create the copy into a list?  Given that sortMatches is doing a prefix match anyway, you might as well just pass in 2 iterators?



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java:
##########
@@ -1479,16 +1484,16 @@ private ScanQuery toScanQuery()
         // Cannot handle this ordering.
         // Scan cannot ORDER BY non-time columns.
         plannerContext.setPlanningError(
-            "SQL query requires order by non-time column %s that is not supported.",
+            "SQL query requires order by non-time column %s, which is not supported.",
             orderByColumns
         );
         return null;
       }
       if (!dataSource.isConcrete()) {
         // Cannot handle this ordering.
-        // Scan cannot ORDER BY non-time columns.
+        // Scan cannot ORDER BY non-concrete datasources on _any_ column.
         plannerContext.setPlanningError(
-            "SQL query is a scan and requires order by on a datasource[%s], which is not supported.",
+            "SQL query requires order by on non-concrete datasource, which is not supported.",
             dataSource

Review Comment:
   you dropped the interpolation of the datasource value.  In a complex query that does lots and lots of stuff, not having anything interpolated that tells you what you did wrong makes it completely impossible to actually fix your query.



##########
sql/src/test/resources/calcite/tests/window/wikipediaScanWindow.sqlTest:
##########
@@ -0,0 +1,23 @@
+type: "operatorValidation"
+
+sql: |
+  SELECT
+    __time,
+    "user",
+    page,
+    LAG(page, 1) OVER (PARTITION BY "user" ORDER BY __time) as priorPage
+  FROM wikipedia
+
+expectedOperators:
+  - { type: "naiveSort", columns: [ { column: "user", direction: "ASC" }, { column: "__time", direction: "ASC" } ]}
+  - { type: "naivePartition",  partitionColumns: [ "user" ] }
+  - type: "window"
+    processor:
+      type: "offset"
+      inputColumn: page
+      outputColumn: w0
+      offset: -1
+
+# Not correct: there should actually be results here. Therefore, currently, this test only verifies that the
+# query is planned as expected, not that the results are correct.
+expectedResults: []

Review Comment:
   Is the test not failing?  I would expect this test to be failing saying that the results are different.  If we want plan-only tests, I think I'd want the `type` to become `operatorPlanValidation` and then the test harness to only look at the expected operators and ignore the results.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -396,4 +434,59 @@ public int getConstantInt(int argPosition)
       return ((Number) getConstantArgument(argPosition).getValue()).intValue();
     }
   }
+
+  /**
+   * Return a list of {@link ColumnWithDirection} corresponding to a {@link RelCollation}.
+   *
+   * @param collation          collation
+   * @param sourceRowSignature signature of the collated rows
+   */
+  private static LinkedHashSet<ColumnWithDirection> computeSortColumnsFromRelCollation(
+      final RelCollation collation,
+      final RowSignature sourceRowSignature
+  )
+  {
+    final LinkedHashSet<ColumnWithDirection> retVal = new LinkedHashSet<>();
+
+    for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+      ColumnWithDirection.Direction direction = null;
+
+      switch (fieldCollation.getDirection()) {
+        case ASCENDING:
+        case STRICTLY_ASCENDING:
+          direction = ColumnWithDirection.Direction.ASC;
+          break;
+
+        case DESCENDING:
+        case STRICTLY_DESCENDING:
+          direction = ColumnWithDirection.Direction.DESC;
+          break;
+      }

Review Comment:
   If one of the fieldCollation values is using `CLUSTERED` direction, I *think* that this code will assume that the data is not sorted from that point forward and return?  Is that intended behavior?  If so, it would be a lot more explicit to have a case in the switch that covers it and returns retVal immediately instead of relying on a `null` value and then falling through.
   
   Alternatively, we could generate an error message?  I'm unsure why this would receive a CLUSTERED direction, so not sure which is correct.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -396,4 +434,59 @@ public int getConstantInt(int argPosition)
       return ((Number) getConstantArgument(argPosition).getValue()).intValue();
     }
   }
+
+  /**
+   * Return a list of {@link ColumnWithDirection} corresponding to a {@link RelCollation}.
+   *
+   * @param collation          collation
+   * @param sourceRowSignature signature of the collated rows
+   */
+  private static LinkedHashSet<ColumnWithDirection> computeSortColumnsFromRelCollation(
+      final RelCollation collation,
+      final RowSignature sourceRowSignature
+  )
+  {
+    final LinkedHashSet<ColumnWithDirection> retVal = new LinkedHashSet<>();
+
+    for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
+      ColumnWithDirection.Direction direction = null;
+
+      switch (fieldCollation.getDirection()) {
+        case ASCENDING:
+        case STRICTLY_ASCENDING:
+          direction = ColumnWithDirection.Direction.ASC;
+          break;
+
+        case DESCENDING:
+        case STRICTLY_DESCENDING:
+          direction = ColumnWithDirection.Direction.DESC;
+          break;
+      }
+
+      if (direction != null) {
+        final ColumnWithDirection columnWithDirection = new ColumnWithDirection(
+            sourceRowSignature.getColumnName(fieldCollation.getFieldIndex()),
+            direction
+        );
+
+        retVal.add(columnWithDirection);
+      } else {
+        break;
+      }
+    }
+
+    return retVal;
+  }
+
+  /**
+   * Whether currentSort is a prefix of priorSort. (i.e., whether data sorted by priorSort is *also* sorted
+   * by currentSort.)
+   */
+  private static boolean sortMatches(
+      final List<ColumnWithDirection> priorSort,
+      final List<ColumnWithDirection> currentSort
+  )
+  {
+    return currentSort.size() <= priorSort.size() && currentSort.equals(priorSort.subList(0, currentSort.size()));
+  }

Review Comment:
   This is an pretty high garbage way of doing things, and it doesn't seem much simpler than just having 2 iterators and walking them together.



##########
sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java:
##########
@@ -206,17 +224,37 @@ public static Windowing fromCalciteStuff(
       // The ordering required for partitioning is actually not important for the semantics.  However, it *is*
       // important that it be consistent across the query.  Because if the incoming data is sorted descending
       // and we try to partition on an ascending sort, we will think the data is not sorted correctly

Review Comment:
   I have a feeling like this comment has gone stale with your changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org