You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/29 23:05:30 UTC

[GitHub] [beam] amaliujia opened a new pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

amaliujia opened a new pull request #11868:
URL: https://github.com/apache/beam/pull/11868


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-639251081


   Thank you Andrew! Will address your comments soon!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r437064277



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0));

Review comment:
       I am planning to keep it for now. Because we will support user-defined table-valued function in the near future, seems to me that such way will be extensible for UDTVF.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0));

Review comment:
       If it turns that UDTVF will goes to different Rel or same Rel but different code path, I will switch here back to 'switch case'

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream PCollection. */
+public interface TVFToPTransform {

Review comment:
       Done. 

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+
+/**
+ * TVFSlidingWindowFn assigns window based on input row's "window_start" and "window_end"
+ * timestamps.
+ */
+public class TVFSlidingWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
+  /** Amount of time between generated windows. */
+  private final Duration period;
+
+  /** Size of the generated windows. */
+  private final Duration size;
+
+  public static TVFSlidingWindowFn of(Duration size, Duration period) {
+    return new TVFSlidingWindowFn(size, period);
+  }
+
+  private TVFSlidingWindowFn(Duration size, Duration period) {
+    this.period = period;
+    this.size = size;
+  }
+
+  @Override
+  public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
+    Row curRow = (Row) c.element();
+    // In sliding window as TVF syntax, each row contains's its window's start and end as metadata,
+    // thus we can assign a window directly based on window's start and end metadata.
+    return Arrays.asList(
+        new IntervalWindow(
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(),
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant()));
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    return equals(other);
+  }
+
+  @Override
+  public Coder<IntervalWindow> windowCoder() {
+    return IntervalWindow.getCoder();
+  }
+
+  @Override
+  public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
+    throw new UnsupportedOperationException(
+        "TVFSlidingWindow does not support side input windows.");
+  }
+
+  @Override

Review comment:
       Done. Thanks!

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+    private TVFToPTransform tumbleToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform hopToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+          Duration size = durationParameter(call.getOperands().get(2));
+          Duration period = durationParameter(call.getOperands().get(3));
+          SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          // Sliding window needs this special WindowFn to assign windows based on window_start,
+          // window_end metadata.
+          WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform sessionToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Duration gap = durationParameter(call.getOperands().get(2));
+
+          Sessions sessions = Sessions.withGapDuration(gap);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          // To extract session's window metadata, we apply a GroupByKey with a dummy key. It is
+          // because
+          // session is merging window. After GBK, SessionWindowDoFn will help extract window_start,
+          // window_end metadata.
+          PCollection<Row> streamWithWindowMetadata =
+              windowedStream
+                  .apply(WithKeys.of("dummy"))

Review comment:
       I don't know if there is any documentation. This is indeed the way to get SESSION_END (I found from our codebase).
   
   For the scalability concern, note that GBK work on per-key and per-window basis, so at least this GBK have a GROUP BY window to reduce hot keys. 
   
   Of course it still might be hot keys if there is a super large SESSION window, I have logged https://jira.apache.org/jira/browse/CALCITE-4051 for improvement idea.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0));
     }
 
     /** Extract timestamps from the windowFieldIndex, then window into windowFns. */
     private PCollection<Row> assignTimestampsAndWindow(
-        PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) {
+        PCollection<Row> upstream,
+        int windowFieldIndex,
+        WindowFn<Object, IntervalWindow> windowFn) {

Review comment:
       Cast session to WindowFn and now it is `Row`

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog catalog, AnalyzerOptions
     // TUMBLE
     catalog.addTableValuedFunction(
         new TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
-            ImmutableList.of("TUMBLE"),
+            ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+            new FunctionSignature(
+                retType, ImmutableList.of(inputTableType, descriptorType, stringType), 123),

Review comment:
       Good point. I changed it back to `-1`. I have seen `-1`usage in internal codebase.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+    private final int windowFieldIndex;
+    private final SlidingWindows windowFn;
+    private final Schema outputSchema;
+
+    public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) {
+      this.windowFn = windowFn;
+      this.windowFieldIndex = windowFieldIndex;
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      Collection<IntervalWindow> windows =
+          windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+      for (IntervalWindow window : windows) {
+        Row.Builder builder = Row.withSchema(outputSchema);
+        builder.addValues(row.getValues());
+        builder.addValue(window.start());
+        builder.addValue(window.end());
+        c.output(builder.build());
+      }
+    }
+  }
+
+  private static class SessionWindowDoFn extends DoFn<KV<String, Iterable<Row>>, Row> {
+    private final Schema outputSchema;
+
+    public SessionWindowDoFn(Schema schema) {
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<Row>> element, BoundedWindow window, OutputReceiver<Row> out) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      for (Row cur : element.getValue()) {

Review comment:
       See scalability comment above. Because GBK works on per-key and per-window basis, and I used "dummy" key above, thus this ParDo will go through every element in a session window. If that window is large, this could cause a problem. (Hot key is always a concern for any GBK).
   
   Logged https://jira.apache.org/jira/browse/CALCITE-4051 for future improvement.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
       Have moved all streaming tests to another file.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+    private TVFToPTransform tumbleToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform hopToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+          Duration size = durationParameter(call.getOperands().get(2));
+          Duration period = durationParameter(call.getOperands().get(3));
+          SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          // Sliding window needs this special WindowFn to assign windows based on window_start,
+          // window_end metadata.
+          WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform sessionToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Duration gap = durationParameter(call.getOperands().get(2));
+
+          Sessions sessions = Sessions.withGapDuration(gap);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          // To extract session's window metadata, we apply a GroupByKey with a dummy key. It is
+          // because
+          // session is merging window. After GBK, SessionWindowDoFn will help extract window_start,
+          // window_end metadata.
+          PCollection<Row> streamWithWindowMetadata =
+              windowedStream
+                  .apply(WithKeys.of("dummy"))

Review comment:
       I don't know if there is any documentation. This is indeed the way to get SESSION_END (I found from our codebase).
   
   For the scalability concern, note that GBK work on per-key and per-window basis, so at least this GBK have a GROUP BY window to reduce hot keys. 
   
   Of course it still might be hot keys if there is a super large SESSION window, I have logged https://jira.apache.org/jira/browse/BEAM-10216 for improvement idea.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),

Review comment:
       Good point!

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+    private final int windowFieldIndex;
+    private final SlidingWindows windowFn;
+    private final Schema outputSchema;
+
+    public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) {
+      this.windowFn = windowFn;
+      this.windowFieldIndex = windowFieldIndex;
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      Collection<IntervalWindow> windows =
+          windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+      for (IntervalWindow window : windows) {
+        Row.Builder builder = Row.withSchema(outputSchema);
+        builder.addValues(row.getValues());
+        builder.addValue(window.start());
+        builder.addValue(window.end());
+        c.output(builder.build());
+      }
+    }
+  }
+
+  private static class SessionWindowDoFn extends DoFn<KV<String, Iterable<Row>>, Row> {
+    private final Schema outputSchema;
+
+    public SessionWindowDoFn(Schema schema) {
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<Row>> element, BoundedWindow window, OutputReceiver<Row> out) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      for (Row cur : element.getValue()) {

Review comment:
       See scalability comment above. Because GBK works on per-key and per-window basis, and I used "dummy" key above, thus this ParDo will go through every element in a session window. If that window is large, this could cause a problem. (Hot key is always a concern for any GBK).
   
   Logged 299d596d873304b87db688fc953cf2d5b28139fe for future improvement.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+    private final int windowFieldIndex;
+    private final SlidingWindows windowFn;
+    private final Schema outputSchema;
+
+    public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) {
+      this.windowFn = windowFn;
+      this.windowFieldIndex = windowFieldIndex;
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      Collection<IntervalWindow> windows =
+          windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+      for (IntervalWindow window : windows) {
+        Row.Builder builder = Row.withSchema(outputSchema);
+        builder.addValues(row.getValues());
+        builder.addValue(window.start());
+        builder.addValue(window.end());
+        c.output(builder.build());
+      }
+    }
+  }
+
+  private static class SessionWindowDoFn extends DoFn<KV<String, Iterable<Row>>, Row> {
+    private final Schema outputSchema;
+
+    public SessionWindowDoFn(Schema schema) {
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<Row>> element, BoundedWindow window, OutputReceiver<Row> out) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      for (Row cur : element.getValue()) {

Review comment:
       See scalability comment above. Because GBK works on per-key and per-window basis, and I used "dummy" key above, thus this ParDo will go through every element in a session window. If that window is large, this could cause a problem. (Hot key is always a concern for any GBK).
   
   Logged https://jira.apache.org/jira/browse/BEAM-10216 for future improvement.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-640953451


   I have tried to add more tests from BeamSQL Calcite to bring TVF syntax to a similar level of testing with existing GROUP BY windows in BeamSQL Calcite.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-636230045


   The thing that worth mentioning is, now in this PR, we can finally give a correct SESSION window end for each row.
   
   In the past, SESSION_END always equals to SESSION_START in GROUP BY windowing syntax.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r436188166



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
       Good point. It can be a good point to move all streaming tests to another place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robinyqiu commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
robinyqiu commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r436178923



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
       +1. IIRC there are some other tests in this file that are testing our streaming extension. It makes sense to separate them to other file.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream PCollection. */
+public interface TVFToPTransform {

Review comment:
       +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r435497089



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+
+/**
+ * TVFSlidingWindowFn assigns window based on input row's "window_start" and "window_end"
+ * timestamps.
+ */
+public class TVFSlidingWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
+  /** Amount of time between generated windows. */
+  private final Duration period;
+
+  /** Size of the generated windows. */
+  private final Duration size;
+
+  public static TVFSlidingWindowFn of(Duration size, Duration period) {
+    return new TVFSlidingWindowFn(size, period);
+  }
+
+  private TVFSlidingWindowFn(Duration size, Duration period) {
+    this.period = period;
+    this.size = size;
+  }
+
+  @Override
+  public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
+    Row curRow = (Row) c.element();
+    // In sliding window as TVF syntax, each row contains's its window's start and end as metadata,
+    // thus we can assign a window directly based on window's start and end metadata.
+    return Arrays.asList(
+        new IntervalWindow(
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(),
+            curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant()));
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    return equals(other);
+  }
+
+  @Override
+  public Coder<IntervalWindow> windowCoder() {
+    return IntervalWindow.getCoder();
+  }
+
+  @Override
+  public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
+    throw new UnsupportedOperationException(
+        "TVFSlidingWindow does not support side input windows.");
+  }
+
+  @Override

Review comment:
       nit: From here to the end of the file is boilerplate that `AutoValue` does for you. Could you use that instead?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+    private TVFToPTransform tumbleToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform hopToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+          Duration size = durationParameter(call.getOperands().get(2));
+          Duration period = durationParameter(call.getOperands().get(3));
+          SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          // Sliding window needs this special WindowFn to assign windows based on window_start,
+          // window_end metadata.
+          WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform sessionToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Duration gap = durationParameter(call.getOperands().get(2));
+
+          Sessions sessions = Sessions.withGapDuration(gap);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          // To extract session's window metadata, we apply a GroupByKey with a dummy key. It is
+          // because
+          // session is merging window. After GBK, SessionWindowDoFn will help extract window_start,
+          // window_end metadata.
+          PCollection<Row> streamWithWindowMetadata =
+              windowedStream
+                  .apply(WithKeys.of("dummy"))

Review comment:
       This block here looks to be really innovative to me! It looks like this is what makes `session_end` work? This also scares me, I don't see how it can work without creating hot key and scalability problems. Is there a doc explaining how this works?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0));

Review comment:
       This approach works, but it increases complexity, reduces debugability, and is more error prone when compared to a simple 'switch case' statement and static methods.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -166,6 +247,54 @@ public void processElement(ProcessContext c) {
     }
   }
 
+  private static class SlidingWindowDoFn extends DoFn<Row, Row> {
+    private final int windowFieldIndex;
+    private final SlidingWindows windowFn;
+    private final Schema outputSchema;
+
+    public SlidingWindowDoFn(SlidingWindows windowFn, int windowFieldIndex, Schema schema) {
+      this.windowFn = windowFn;
+      this.windowFieldIndex = windowFieldIndex;
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Row row = c.element();
+      Collection<IntervalWindow> windows =
+          windowFn.assignWindows(row.getDateTime(windowFieldIndex).toInstant());
+      for (IntervalWindow window : windows) {
+        Row.Builder builder = Row.withSchema(outputSchema);
+        builder.addValues(row.getValues());
+        builder.addValue(window.start());
+        builder.addValue(window.end());
+        c.output(builder.build());
+      }
+    }
+  }
+
+  private static class SessionWindowDoFn extends DoFn<KV<String, Iterable<Row>>, Row> {
+    private final Schema outputSchema;
+
+    public SessionWindowDoFn(Schema schema) {
+      this.outputSchema = schema;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<Row>> element, BoundedWindow window, OutputReceiver<Row> out) {
+      IntervalWindow intervalWindow = (IntervalWindow) window;
+      for (Row cur : element.getValue()) {

Review comment:
       This is going to iterate over every element in the window, that won't work. You might be able to make something that works reasonably well with `CombineFn`. (I don't think this is something we will figure out in PR comments.)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -607,7 +630,13 @@ private RexInputRef convertWatermarkedResolvedColumnToRexInputRef(
 
   private ResolvedColumn extractWatermarkColumnFromDescriptor(
       ResolvedNodes.ResolvedDescriptor descriptor) {
-    return descriptor.getDescriptorColumnList().get(0);
+    ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0);
+    if (wmCol.getType().getKind() != TYPE_TIMESTAMP) {

Review comment:
       nit: Use checkArgument?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFToPTransform.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+
+/** Provides a function that produces a PCollection based on TVF and upstream PCollection. */
+public interface TVFToPTransform {

Review comment:
       This isn't a `public interface`, it is a private implementation detail of `BeamTableFunctionScanRel`. Please move it there.

##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -4805,6 +4805,93 @@ public void testTVFTumbleAggregation() {
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test

Review comment:
       nit: Are these really `ZetaSQLDialectSpecTests`? There are no matching compliance tests. Its probably worth moving these tests into their own file.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),
+          "Only support %s table-valued functions. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF,
           operatorName);
-      RexCall call = ((RexCall) getCall());
-      RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
-      PCollection<Row> upstream = input.get(0);
-      Schema outputSchema = CalciteUtils.toSchema(getRowType());
-      FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
-      PCollection<Row> streamWithWindowMetadata =
-          upstream
-              .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
-              .setRowSchema(outputSchema);
-
-      PCollection<Row> windowedStream =
-          assignTimestampsAndWindow(
-              streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
 
-      return windowedStream;
+      return tvfToPTransformMap.get(operatorName).toPTransform(((RexCall) getCall()), input.get(0));
     }
 
     /** Extract timestamps from the windowFieldIndex, then window into windowFns. */
     private PCollection<Row> assignTimestampsAndWindow(
-        PCollection<Row> upstream, int windowFieldIndex, WindowFn<Row, IntervalWindow> windowFn) {
+        PCollection<Row> upstream,
+        int windowFieldIndex,
+        WindowFn<Object, IntervalWindow> windowFn) {

Review comment:
       Why did `Row` become `Object` here? How can we keep this as `Row`?

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -95,29 +186,19 @@ public TableFunctionScan copy(
           input);
       String operatorName = ((RexCall) getCall()).getOperator().getName();
       checkArgument(
-          operatorName.equals("TUMBLE"),
-          "Only support TUMBLE table-valued function. Current operator: %s",
+          TVFStreamingUtils.WINDOWING_TVF.contains(operatorName),

Review comment:
       `TVFStreamingUtils.WINDOWING_TVF` is not correct here, you should be checking against the map keys `tvfToPTransformMap.keySet()` (or you could make this the default case on a switch statement).

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog catalog, AnalyzerOptions
     // TUMBLE
     catalog.addTableValuedFunction(
         new TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
-            ImmutableList.of("TUMBLE"),
+            ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+            new FunctionSignature(
+                retType, ImmutableList.of(inputTableType, descriptorType, stringType), 123),

Review comment:
       The constant `123` here (and repeated below) is suppose to be a globally unique ID out of `com.google.zetasql.ZetaSQLFunction.FunctionSignatureId`. This one is `FN_BITWISE_NOT_UINT64`. I'm pretty sure that doesn't match this function signature. If we don't care about these, could you use `com.google.zetasql.ZetaSQLFunction.FunctionSignatureId.FN_INVALID_FUNCTION_ID`.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -607,7 +630,13 @@ private RexInputRef convertWatermarkedResolvedColumnToRexInputRef(
 
   private ResolvedColumn extractWatermarkColumnFromDescriptor(
       ResolvedNodes.ResolvedDescriptor descriptor) {
-    return descriptor.getDescriptorColumnList().get(0);
+    ResolvedColumn wmCol = descriptor.getDescriptorColumnList().get(0);
+    if (wmCol.getType().getKind() != TYPE_TIMESTAMP) {
+      throw new IllegalArgumentException(
+          "Watermarked column should be TIMESTAMP type: "
+              + extractWatermarkColumnNameFromDescriptor(descriptor));
+    }
+    return wmCol;
   }
 
   private String extractWatermarkColumnNameFromDescriptor(

Review comment:
       nit: inline this function?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-643016917


   Thank you Andrew for all the work to review this PR! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia merged pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia merged pull request #11868:
URL: https://github.com/apache/beam/pull/11868


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-636229815


   R: @robinyqiu @apilloud @kennknowles 
   cc: @tysonjh 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #11868:
URL: https://github.com/apache/beam/pull/11868#discussion_r437068988



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamTableFunctionScanRel.java
##########
@@ -85,6 +97,85 @@ public TableFunctionScan copy(
   }
 
   private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+    private TVFToPTransform tumbleToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          FixedWindows windowFn = FixedWindows.of(durationParameter(call.getOperands().get(2)));
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new FixedWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), (WindowFn) windowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform hopToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+
+          Duration size = durationParameter(call.getOperands().get(2));
+          Duration period = durationParameter(call.getOperands().get(3));
+          SlidingWindows windowFn = SlidingWindows.of(size).every(period);
+          PCollection<Row> streamWithWindowMetadata =
+              upstream
+                  .apply(ParDo.of(new SlidingWindowDoFn(windowFn, wmCol.getIndex(), outputSchema)))
+                  .setRowSchema(outputSchema);
+
+          // Sliding window needs this special WindowFn to assign windows based on window_start,
+          // window_end metadata.
+          WindowFn specialWindowFn = TVFSlidingWindowFn.of(size, period);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(
+                  streamWithWindowMetadata, wmCol.getIndex(), specialWindowFn);
+
+          return windowedStream;
+        };
+
+    private TVFToPTransform sessionToPTransform =
+        (call, upstream) -> {
+          RexInputRef wmCol = (RexInputRef) call.getOperands().get(1);
+          Duration gap = durationParameter(call.getOperands().get(2));
+
+          Sessions sessions = Sessions.withGapDuration(gap);
+
+          PCollection<Row> windowedStream =
+              assignTimestampsAndWindow(upstream, wmCol.getIndex(), sessions);
+
+          Schema outputSchema = CalciteUtils.toSchema(getRowType());
+          // To extract session's window metadata, we apply a GroupByKey with a dummy key. It is
+          // because
+          // session is merging window. After GBK, SessionWindowDoFn will help extract window_start,
+          // window_end metadata.
+          PCollection<Row> streamWithWindowMetadata =
+              windowedStream
+                  .apply(WithKeys.of("dummy"))

Review comment:
       This product is for processing big data so 'super large' is the target workflow. Can you point me at the existing code that does this?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -206,7 +206,37 @@ private void addBuiltinFunctionsToCatalog(SimpleCatalog catalog, AnalyzerOptions
     // TUMBLE
     catalog.addTableValuedFunction(
         new TableValuedFunction.ForwardInputSchemaToOutputSchemaWithAppendedColumnTVF(
-            ImmutableList.of("TUMBLE"),
+            ImmutableList.of(TVFStreamingUtils.FIXED_WINDOW_TVF),
+            new FunctionSignature(
+                retType, ImmutableList.of(inputTableType, descriptorType, stringType), 123),

Review comment:
       `-1` is `__FunctionSignatureId__switch_must_have_a_default__`, but I don't see any harm in using that.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/TVFStreamingUtils.java
##########
@@ -17,8 +17,17 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.utils;
 
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
 /** Provides static constants or utils for TVF streaming. */
 public class TVFStreamingUtils {
   public static final String WINDOW_START = "window_start";
   public static final String WINDOW_END = "window_end";
+
+  public static final String FIXED_WINDOW_TVF = "TUMBLE";
+  public static final String SLIDING_WINDOW_TVF = "HOP";
+  public static final String SESSION_WINDOW_TVF = "SESSION";
+
+  public static final ImmutableSet<String> WINDOWING_TVF =

Review comment:
       This appears to be unused now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11868: [BEAM-9363] Support HOP and SESSION as TVF

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11868:
URL: https://github.com/apache/beam/pull/11868#issuecomment-642821565


   Added KeyCol descriptor support with tests (single key, multiple key tests, etc.) Also squashed commits.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org