You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 22:49:49 UTC

[GitHub] [beam] robinyqiu commented on a change in pull request #11807: [BEAM-9363] Support TUMBLE aggregation

robinyqiu commented on a change in pull request #11807:
URL: https://github.com/apache/beam/pull/11807#discussion_r431481621



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4780,6 +4780,31 @@ public void testTumbleAsTVF() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test
+  public void testTVFTumbleAggregation() {
+    String sql =
+        "SELECT COUNT(*) as field_count, "
+            + "window_start "
+            + "FROM TUMBLE((select * from KeyValue), descriptor(ts), 'INTERVAL 1 SECOND') "
+            + "GROUP BY window_start";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    final Schema schema =
+        Schema.builder().addInt64Field("count_start").addDateTimeField("window_start").build();

Review comment:
       Nit: `count_start` should be `field_count`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -99,14 +102,32 @@ public TableFunctionScan copy(
       RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
       PCollection<Row> upstream = input.get(0);
       Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      return upstream
-          .apply(
-              ParDo.of(
-                  new FixedWindowDoFn(
-                      FixedWindows.of(durationParameter(call.getOperands().get(2))),
-                      wmCol.getIndex(),
-                      outputSchema)))
-          .setRowSchema(outputSchema);
+      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+      PCollection<Row> streamWithWindowMetadata =
+          upstream
+              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+              .setRowSchema(outputSchema);
+
+      PCollection<Row> windowedStream =
+          assignTimestampsAndWindow(
+              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+      return windowedStream;
+    }
+
+    /** Extract timestamps from the windowFieldIndex, then window into windowFns. */
+    private PCollection<Row> assignTimestampsAndWindow(
+        PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) {
+      PCollection<Row> windowedStream;
+      windowedStream =
+          upstream

Review comment:
       Why not just `return upstream.apply(...)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org