You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/27 22:51:56 UTC

[GitHub] [beam] amaliujia commented on a change in pull request #11807: [BEAM-9363] Support TUMBLE aggregation

amaliujia commented on a change in pull request #11807:
URL: https://github.com/apache/beam/pull/11807#discussion_r431486841



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -99,14 +102,32 @@ public TableFunctionScan copy(
       RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
       PCollection<Row> upstream = input.get(0);
       Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      return upstream
-          .apply(
-              ParDo.of(
-                  new FixedWindowDoFn(
-                      FixedWindows.of(durationParameter(call.getOperands().get(2))),
-                      wmCol.getIndex(),
-                      outputSchema)))
-          .setRowSchema(outputSchema);
+      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+      PCollection<Row> streamWithWindowMetadata =
+          upstream
+              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+              .setRowSchema(outputSchema);
+
+      PCollection<Row> windowedStream =
+          assignTimestampsAndWindow(
+              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+      return windowedStream;
+    }
+
+    /** Extract timestamps from the windowFieldIndex, then window into windowFns. */
+    private PCollection<Row> assignTimestampsAndWindow(
+        PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) {
+      PCollection<Row> windowedStream;
+      windowedStream =
+          upstream

Review comment:
       Not a big deal. Just want to use the name `windowedStream` to improve readability. E.g. readers know it's returning a windowed PCollection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org