You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/08/16 17:40:23 UTC

[beam] branch master updated: [BEAM-12759] ORDER BY then SELECT

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad92c6  [BEAM-12759] ORDER BY then SELECT
     new c3bc467  Merge pull request #15334 from apilloud/orderby
5ad92c6 is described below

commit 5ad92c662fd7e5a7108eb7e667d670b08d5ab0e9
Author: Andrew Pilloud <ap...@google.com>
AuthorDate: Fri Aug 13 17:24:47 2021 -0700

    [BEAM-12759] ORDER BY then SELECT
---
 .../LimitOffsetScanToOrderByLimitConverter.java            |  7 ++++---
 .../sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
index ad40241..d54ba3b 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/LimitOffsetScanToOrderByLimitConverter.java
@@ -64,7 +64,7 @@ class LimitOffsetScanToOrderByLimitConverter extends RelConverter<ResolvedLimitO
   @Override
   public RelNode convert(ResolvedLimitOffsetScan zetaNode, List<RelNode> inputs) {
     ResolvedOrderByScan inputOrderByScan = (ResolvedOrderByScan) zetaNode.getInputScan();
-    RelNode input = convertOrderByScanToLogicalScan(inputOrderByScan, inputs.get(0));
+    RelNode input = inputs.get(0);
     RelCollation relCollation = getRelCollation(inputOrderByScan);
 
     RexNode offset =
@@ -83,12 +83,13 @@ class LimitOffsetScanToOrderByLimitConverter extends RelConverter<ResolvedLimitO
       throw new UnsupportedOperationException("Limit requires non-null count and offset");
     }
 
-    return LogicalSort.create(input, relCollation, offset, fetch);
+    RelNode sorted = LogicalSort.create(input, relCollation, offset, fetch);
+    return convertOrderByScanToLogicalScan(inputOrderByScan, sorted);
   }
 
   /** Collation is a sort order, as in ORDER BY DESCENDING/ASCENDING. */
   private static RelCollation getRelCollation(ResolvedOrderByScan node) {
-    final long inputOffset = node.getColumnList().get(0).getId();
+    final long inputOffset = node.getInputScan().getColumnList().get(0).getId();
     List<RelFieldCollation> fieldCollations =
         node.getOrderByItemList().stream()
             .map(item -> orderByItemToFieldCollation(item, inputOffset))
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index 80725da..a802b30 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -1292,6 +1292,20 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
   }
 
   @Test
+  public void testZetaSQLSelectFromTableOrderByNoSelectLimit() {
+    String sql = "SELECT Value FROM KeyValue ORDER BY Key DESC LIMIT 2;";
+    PCollection<Row> stream = execute(sql);
+
+    final Schema schema = Schema.builder().addStringField("field2").build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema).addValues("KeyValue234").build(),
+            Row.withSchema(schema).addValues("KeyValue235").build());
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testZetaSQLSelectFromTableOrderBy() {
     String sql = "SELECT Key, Value FROM KeyValue ORDER BY Key DESC;";
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);