You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/10 21:46:44 UTC

[GitHub] [beam] jhnmora000 opened a new pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

jhnmora000 opened a new pull request #11975:
URL: https://github.com/apache/beam/pull/11975


   An concept-proof implementation for a cumulative sum using Analytic functions.
   
   Implemented in: BeamAnalyticFunctionsExperimentTest.testOverCumulativeSum()
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439799050



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(

Review comment:
       O now I see. The call are in `Group`.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(

Review comment:
       O now I see. The calls are in `Group`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-642302788


   also cc @Mark-Zeng.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r445107329



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema schema, List<FieldAggregation> fieldAgg) {
+      this.outputSchema = schema;
+      this.aggFields = fieldAgg;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> inputData = input.get(0);
+      Schema inputSchema = inputData.getSchema();
+      for (FieldAggregation af : aggFields) {
+        if (af.partitionKeys.isEmpty()) {
+          // This sections simulate a KV Row
+          // Similar to the output of Group.byFieldIds
+          // When no partitions are specified
+          Schema inputSch = inputData.getSchema();
+          Schema mockKeySchema =
+              Schema.of(Schema.Field.of("mock", Schema.FieldType.STRING.withNullable(true)));
+          Schema simulatedKeyValueSchema =
+              Schema.of(
+                  Schema.Field.of("key", Schema.FieldType.row(mockKeySchema)),
+                  Schema.Field.of(
+                      "value", Schema.FieldType.iterable(Schema.FieldType.row(inputSch))));
+          PCollection<Iterable<Row>> apply =
+              inputData.apply(org.apache.beam.sdk.schemas.transforms.Group.globally());
+          inputData =
+              apply
+                  .apply(ParDo.of(uniquePartition(mockKeySchema, simulatedKeyValueSchema)))
+                  .setRowSchema(simulatedKeyValueSchema);
+        } else {
+          org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+              org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+          inputData = inputData.apply("partitionBy", myg);

Review comment:
       In fact, if there is no order by but only partition by, combine per key will be the best API because backend can do combiner lift optimizations, which works as follow:
   
   KV -> GroupByKey -> KV -> Combine
   can be optimized as
   KV -> local combine -> K CombinedV -> GroupByKey -> Combine all combined V.
   
   So basically there could be a pre-combine before shuffle (GroupByKey), and after that each worker will only need to combine those pre-combined value (because of associativity rule).  This optimization will reduce lots of data through shuffle.
   
   I think this can be left for a future work. Please log a JIRA to document this idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r449962950



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema schema, List<FieldAggregation> fieldAgg) {
+      this.outputSchema = schema;
+      this.aggFields = fieldAgg;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> inputData = input.get(0);
+      Schema inputSchema = inputData.getSchema();
+      for (FieldAggregation af : aggFields) {
+        if (af.partitionKeys.isEmpty()) {
+          // This sections simulate a KV Row
+          // Similar to the output of Group.byFieldIds
+          // When no partitions are specified
+          Schema inputSch = inputData.getSchema();
+          Schema mockKeySchema =
+              Schema.of(Schema.Field.of("mock", Schema.FieldType.STRING.withNullable(true)));
+          Schema simulatedKeyValueSchema =
+              Schema.of(
+                  Schema.Field.of("key", Schema.FieldType.row(mockKeySchema)),
+                  Schema.Field.of(
+                      "value", Schema.FieldType.iterable(Schema.FieldType.row(inputSch))));
+          PCollection<Iterable<Row>> apply =
+              inputData.apply(org.apache.beam.sdk.schemas.transforms.Group.globally());
+          inputData =
+              apply
+                  .apply(ParDo.of(uniquePartition(mockKeySchema, simulatedKeyValueSchema)))
+                  .setRowSchema(simulatedKeyValueSchema);
+        } else {
+          org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+              org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+          inputData = inputData.apply("partitionBy", myg);

Review comment:
       I added a comment in the JIRA issue of the project [`issue`](https://issues.apache.org/jira/browse/BEAM-9198?focusedCommentId=17151728&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17151728)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-642285036


   R:@amaliujia


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658463117


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439796682



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(

Review comment:
       Interesting. I didn't know that there is no aggregate calls in WindowRel.
   
   Do you know where those calls are defined?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-656931101


   retest this please.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-646204467


   Hi.
   I forgot to push my latest changes.
   These two tests are working now:
   - BeamAnalyticFunctionsExperimentTest.testOverCumulativeSum()
   - BeamAnalyticFunctionsExperimentTest.testSimpleOverFunction()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439795971



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields =
+        this.groups.stream()
+            .map(
+                anAnalyticGroup -> {
+                  List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+                  List<Integer> orderByKeys = Lists.newArrayList();
+                  List<Boolean> orderByDirections = Lists.newArrayList();
+                  List<Boolean> orderByNullDirections = Lists.newArrayList();
+                  anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                      .forEach(
+                          fc -> {
+                            orderByKeys.add(fc.getFieldIndex());
+                            orderByDirections.add(
+                                fc.direction == RelFieldCollation.Direction.ASCENDING);
+                            orderByNullDirections.add(
+                                fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                          });
+                  int lowerB = Integer.MAX_VALUE; // Unbounded by default
+                  int upperB = Integer.MAX_VALUE; // Unbounded by default
+                  if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                    lowerB = 0;
+                  } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                    // pending
+                  }
+                  if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                    upperB = 0;
+                  } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                    // pending
+                  }
+                  // Assume a single input for now
+                  final List<Integer> aggregationFields = Lists.newArrayList();
+                  anAnalyticGroup.aggCalls.stream()
+                      .forEach(
+                          anAggCall -> {
+                            anAggCall.operands.stream()
+                                .forEach(
+                                    anAggCallInput -> {
+                                      aggregationFields.add(
+                                          ((RexInputRef) anAggCallInput).getIndex());
+                                    });
+                          });
+                  return new FieldAggregation(
+                      partitionKeysDef,
+                      orderByKeys,
+                      orderByDirections,
+                      orderByNullDirections,
+                      lowerB,
+                      upperB,
+                      anAnalyticGroup.isRows,
+                      aggregationFields);
+                })
+            .collect(toList());
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    // private AggFunction  ... pending
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> fields) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = fields;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema s, List<FieldAggregation> af) {
+      this.outputSchema = s;
+      this.aggFields = af;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> r = input.get(0);
+      for (FieldAggregation af : aggFields) {
+        org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+            org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+        r = r.apply("partitionBy", myg);
+        r = r.apply("orderBy", ParDo.of(sortPartition(af))).setRowSchema(r.getSchema());
+        r = r.apply("aggCall", ParDo.of(aggField(outputSchema, af))).setRowSchema(outputSchema);
+      }
+      return r;
+    }
+  }
+
+  private static DoFn<Row, Row> aggField(
+      final Schema outputSchema, final FieldAggregation fieldAgg) {
+    return new DoFn<Row, Row>() {
+      @ProcessElement
+      public void processElement(
+          @Element Row inputPartition, OutputReceiver<Row> out, ProcessContext c) {
+        Collection<Row> inputPartitions = inputPartition.getArray(1); // 1 -> value
+        List<Row> sortedRowsAsList = new ArrayList<Row>(inputPartitions);
+        for (int idx = 0; idx < sortedRowsAsList.size(); idx++) {
+          int lowerIndex = idx - fieldAgg.lowerLimit;
+          int upperIndex = idx + fieldAgg.upperLimit + 1;
+          lowerIndex = lowerIndex < 0 ? 0 : lowerIndex;
+          upperIndex = upperIndex > sortedRowsAsList.size() ? sortedRowsAsList.size() : upperIndex;
+          List<Row> aggRange = sortedRowsAsList.subList(lowerIndex, upperIndex);
+
+          // Just concept-proof
+          // Assume that aggFun = SUM
+          // Assume dataType = INTEGER
+          final Combine.CombineFn<Integer, int[], Integer> aggFunction =
+              (Combine.CombineFn<Integer, int[], Integer>)
+                  BeamBuiltinAggregations.create("SUM", Schema.FieldType.INT32);
+          int[] aggAccumulator = aggFunction.createAccumulator();
+
+          // Assume a simple expression within SUM($aUniqueDirectField)
+          final int aggFieldIndex = fieldAgg.inputFields.get(0);
+
+          for (Row aggRow : aggRange) {
+            Integer valueToAgg = aggRow.getInt32(aggFieldIndex);
+            aggFunction.addInput(aggAccumulator, valueToAgg);
+          }
+          Integer aggOutput = aggFunction.extractOutput(aggAccumulator);
+          List<Object> fieldValues =
+              Lists.newArrayListWithCapacity(sortedRowsAsList.get(idx).getFieldCount());
+          fieldValues.addAll(sortedRowsAsList.get(idx).getValues());
+          fieldValues.add(aggOutput);
+          Row ou = Row.withSchema(outputSchema).addValues(fieldValues).build();
+          out.output(ou);
+        }
+      }
+    };
+  }
+
+  private static DoFn<Row, Row> sortPartition(final FieldAggregation fieldAgg) {

Review comment:
       It does not bad to start from this ParDo though. I posted that transform just for curiosity. Not required to use SortValues at this moment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658460751


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658417727


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r445107329



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema schema, List<FieldAggregation> fieldAgg) {
+      this.outputSchema = schema;
+      this.aggFields = fieldAgg;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> inputData = input.get(0);
+      Schema inputSchema = inputData.getSchema();
+      for (FieldAggregation af : aggFields) {
+        if (af.partitionKeys.isEmpty()) {
+          // This sections simulate a KV Row
+          // Similar to the output of Group.byFieldIds
+          // When no partitions are specified
+          Schema inputSch = inputData.getSchema();
+          Schema mockKeySchema =
+              Schema.of(Schema.Field.of("mock", Schema.FieldType.STRING.withNullable(true)));
+          Schema simulatedKeyValueSchema =
+              Schema.of(
+                  Schema.Field.of("key", Schema.FieldType.row(mockKeySchema)),
+                  Schema.Field.of(
+                      "value", Schema.FieldType.iterable(Schema.FieldType.row(inputSch))));
+          PCollection<Iterable<Row>> apply =
+              inputData.apply(org.apache.beam.sdk.schemas.transforms.Group.globally());
+          inputData =
+              apply
+                  .apply(ParDo.of(uniquePartition(mockKeySchema, simulatedKeyValueSchema)))
+                  .setRowSchema(simulatedKeyValueSchema);
+        } else {
+          org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+              org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+          inputData = inputData.apply("partitionBy", myg);

Review comment:
       In fact, if there is no order by but only partition by, combine per key will be the best API because backend can do combiner lift optimizations, which works as follow:
   
   KV -> GroupByKey -> KV -> Combine
   can be optimized as
   KV -> local combine -> K Combined V -> GroupByKey -> Combine all combined Vs.
   
   So basically there could be a pre-combine before shuffle (GroupByKey), and after that each worker will only need to combine those pre-combined value (because of associativity rule).  This optimization will reduce lots of data through shuffle.
   
   I think this can be left for a future work. Please log a JIRA to document this idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-656995715


   Before this PR is ready to be merged, please
   
   1) run `./gradlew :sdks:java:extensions:sql:check` to check style violations and fix violations.
   2) Can you rebase your PR against the head of master?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439796555



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields =
+        this.groups.stream()
+            .map(
+                anAnalyticGroup -> {
+                  List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+                  List<Integer> orderByKeys = Lists.newArrayList();
+                  List<Boolean> orderByDirections = Lists.newArrayList();
+                  List<Boolean> orderByNullDirections = Lists.newArrayList();
+                  anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                      .forEach(
+                          fc -> {
+                            orderByKeys.add(fc.getFieldIndex());
+                            orderByDirections.add(
+                                fc.direction == RelFieldCollation.Direction.ASCENDING);
+                            orderByNullDirections.add(
+                                fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                          });
+                  int lowerB = Integer.MAX_VALUE; // Unbounded by default
+                  int upperB = Integer.MAX_VALUE; // Unbounded by default
+                  if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                    lowerB = 0;
+                  } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                    // pending
+                  }
+                  if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                    upperB = 0;
+                  } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                    // pending
+                  }
+                  // Assume a single input for now
+                  final List<Integer> aggregationFields = Lists.newArrayList();
+                  anAnalyticGroup.aggCalls.stream()
+                      .forEach(
+                          anAggCall -> {
+                            anAggCall.operands.stream()
+                                .forEach(
+                                    anAggCallInput -> {
+                                      aggregationFields.add(
+                                          ((RexInputRef) anAggCallInput).getIndex());
+                                    });
+                          });
+                  return new FieldAggregation(
+                      partitionKeysDef,
+                      orderByKeys,
+                      orderByDirections,
+                      orderByNullDirections,
+                      lowerB,
+                      upperB,
+                      anAnalyticGroup.isRows,
+                      aggregationFields);
+                })
+            .collect(toList());
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    // private AggFunction  ... pending
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> fields) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = fields;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema s, List<FieldAggregation> af) {
+      this.outputSchema = s;
+      this.aggFields = af;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> r = input.get(0);
+      for (FieldAggregation af : aggFields) {
+        org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+            org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);

Review comment:
       This is nice to use schema's transforms.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r453159661



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {

Review comment:
       Can you remove the "Experiment" from class. I am going to merge this PR so it won't be experiment anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r453159661



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {

Review comment:
       Can you remove the "Experiment" from class? I am going to merge this PR so it won't be experiment anymore.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439796776



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {
+
+  /**
+   * Table schema and data taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#produce_table
+   *
+   * <p>Compute a cumulative sum query taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#compute_a_cumulative_sum
+   */
+  @Test
+  public void testOverCumulativeSum() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    Schema schema =
+        Schema.builder()
+            .addStringField("item")
+            .addStringField("category")
+            .addInt32Field("purchases")
+            .build();
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            "kale",
+                            "vegetable",
+                            23,
+                            "orange",
+                            "fruit",
+                            2,
+                            "cabbage",
+                            "vegetable",
+                            9,
+                            "apple",
+                            "fruit",
+                            8,
+                            "leek",
+                            "vegetable",
+                            2,
+                            "lettuce",
+                            "vegetable",
+                            10)
+                        .getRows()))
+            .setRowSchema(schema);
+    String sql =
+        "SELECT item, purchases, category, sum(purchases) over "
+            + "(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"
+            + " as total_purchases  FROM PCOLLECTION";

Review comment:
       Can you paste logical plan of this query in to comments? Just to help me better understand what Calcite produces at least on Logical plan level.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r439795902



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexInputRef;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields =
+        this.groups.stream()
+            .map(
+                anAnalyticGroup -> {
+                  List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+                  List<Integer> orderByKeys = Lists.newArrayList();
+                  List<Boolean> orderByDirections = Lists.newArrayList();
+                  List<Boolean> orderByNullDirections = Lists.newArrayList();
+                  anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                      .forEach(
+                          fc -> {
+                            orderByKeys.add(fc.getFieldIndex());
+                            orderByDirections.add(
+                                fc.direction == RelFieldCollation.Direction.ASCENDING);
+                            orderByNullDirections.add(
+                                fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                          });
+                  int lowerB = Integer.MAX_VALUE; // Unbounded by default
+                  int upperB = Integer.MAX_VALUE; // Unbounded by default
+                  if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                    lowerB = 0;
+                  } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                    // pending
+                  }
+                  if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                    upperB = 0;
+                  } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                    // pending
+                  } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                    // pending
+                  }
+                  // Assume a single input for now
+                  final List<Integer> aggregationFields = Lists.newArrayList();
+                  anAnalyticGroup.aggCalls.stream()
+                      .forEach(
+                          anAggCall -> {
+                            anAggCall.operands.stream()
+                                .forEach(
+                                    anAggCallInput -> {
+                                      aggregationFields.add(
+                                          ((RexInputRef) anAggCallInput).getIndex());
+                                    });
+                          });
+                  return new FieldAggregation(
+                      partitionKeysDef,
+                      orderByKeys,
+                      orderByDirections,
+                      orderByNullDirections,
+                      lowerB,
+                      upperB,
+                      anAnalyticGroup.isRows,
+                      aggregationFields);
+                })
+            .collect(toList());
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    // private AggFunction  ... pending
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> fields) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = fields;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema s, List<FieldAggregation> af) {
+      this.outputSchema = s;
+      this.aggFields = af;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> r = input.get(0);
+      for (FieldAggregation af : aggFields) {
+        org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+            org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+        r = r.apply("partitionBy", myg);
+        r = r.apply("orderBy", ParDo.of(sortPartition(af))).setRowSchema(r.getSchema());
+        r = r.apply("aggCall", ParDo.of(aggField(outputSchema, af))).setRowSchema(outputSchema);
+      }
+      return r;
+    }
+  }
+
+  private static DoFn<Row, Row> aggField(
+      final Schema outputSchema, final FieldAggregation fieldAgg) {
+    return new DoFn<Row, Row>() {
+      @ProcessElement
+      public void processElement(
+          @Element Row inputPartition, OutputReceiver<Row> out, ProcessContext c) {
+        Collection<Row> inputPartitions = inputPartition.getArray(1); // 1 -> value
+        List<Row> sortedRowsAsList = new ArrayList<Row>(inputPartitions);
+        for (int idx = 0; idx < sortedRowsAsList.size(); idx++) {
+          int lowerIndex = idx - fieldAgg.lowerLimit;
+          int upperIndex = idx + fieldAgg.upperLimit + 1;
+          lowerIndex = lowerIndex < 0 ? 0 : lowerIndex;
+          upperIndex = upperIndex > sortedRowsAsList.size() ? sortedRowsAsList.size() : upperIndex;
+          List<Row> aggRange = sortedRowsAsList.subList(lowerIndex, upperIndex);
+
+          // Just concept-proof
+          // Assume that aggFun = SUM
+          // Assume dataType = INTEGER
+          final Combine.CombineFn<Integer, int[], Integer> aggFunction =
+              (Combine.CombineFn<Integer, int[], Integer>)
+                  BeamBuiltinAggregations.create("SUM", Schema.FieldType.INT32);
+          int[] aggAccumulator = aggFunction.createAccumulator();
+
+          // Assume a simple expression within SUM($aUniqueDirectField)
+          final int aggFieldIndex = fieldAgg.inputFields.get(0);
+
+          for (Row aggRow : aggRange) {
+            Integer valueToAgg = aggRow.getInt32(aggFieldIndex);
+            aggFunction.addInput(aggAccumulator, valueToAgg);
+          }
+          Integer aggOutput = aggFunction.extractOutput(aggAccumulator);
+          List<Object> fieldValues =
+              Lists.newArrayListWithCapacity(sortedRowsAsList.get(idx).getFieldCount());
+          fieldValues.addAll(sortedRowsAsList.get(idx).getValues());
+          fieldValues.add(aggOutput);
+          Row ou = Row.withSchema(outputSchema).addValues(fieldValues).build();
+          out.output(ou);
+        }
+      }
+    };
+  }
+
+  private static DoFn<Row, Row> sortPartition(final FieldAggregation fieldAgg) {

Review comment:
       I am wondering if this transform can be reused? https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-647809650


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-647224221


   @jhnmora000 
   
   You can run `./gradlew check" to check style violations.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658473749


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-647196694


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r442376683



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {
+
+  /**
+   * Table schema and data taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#produce_table
+   *
+   * <p>Compute a cumulative sum query taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#compute_a_cumulative_sum
+   */
+  @Test
+  public void testOverCumulativeSum() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    Schema schema =
+        Schema.builder()
+            .addStringField("item")
+            .addStringField("category")
+            .addInt32Field("purchases")
+            .build();
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            "kale",
+                            "vegetable",
+                            23,
+                            "orange",
+                            "fruit",
+                            2,
+                            "cabbage",
+                            "vegetable",
+                            9,
+                            "apple",
+                            "fruit",
+                            8,
+                            "leek",
+                            "vegetable",
+                            2,
+                            "lettuce",
+                            "vegetable",
+                            10)
+                        .getRows()))
+            .setRowSchema(schema);
+    String sql =
+        "SELECT item, purchases, category, sum(purchases) over "
+            + "(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"
+            + " as total_purchases  FROM PCOLLECTION";

Review comment:
       `BeamCalcRel(expr#0..3=[{inputs}], item=[$t0], purchases=[$t2], category=[$t1], total_purchases=[$t3])
     BeamWindowRel(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [SUM($2)])])
       BeamIOSourceRel(table=[[beam, PCOLLECTION]])`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658485305


   @jhnmora000 thanks for you contribution!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r444548154



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();

Review comment:
       Add a test or add a checkArgument to disable (either one works for me) for NULL last or NULL first in ORDER BY.
   
   For NULL handling, depends on you, you can leave it for future PRs.
   

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {

Review comment:
       Based on your implementation below, it will be really awesome that you add what is supported or what are constrains. Then you can gradually update this java doc once you add new features. 

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending

Review comment:
       Better to use `checkArgument` to stop execution when you are seeing an unsupported case.
   
   See: https://www.baeldung.com/guava-preconditions

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamCalcRule.java
##########
@@ -37,7 +40,20 @@ private BeamCalcRule() {
 
   @Override
   public boolean matches(RelOptRuleCall x) {
-    return true;
+    boolean hasRexOver = false;

Review comment:
       Add a comment to explain what this piece of code is doing. Basically it stops converting in the OVER clause case.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {

Review comment:
       Add java doc for classes (java doc means comments starts with `/*` and ends with `*/`

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending

Review comment:
       Same

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(

Review comment:
       Even though you will stop executions on some unsupported cases, I think it is still ok to keep this class definition (e.g. no need to remove those unused field).

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();

Review comment:
       Add a test that includes DESC for ORDER BY? 

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema schema, List<FieldAggregation> fieldAgg) {
+      this.outputSchema = schema;
+      this.aggFields = fieldAgg;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> inputData = input.get(0);
+      Schema inputSchema = inputData.getSchema();
+      for (FieldAggregation af : aggFields) {
+        if (af.partitionKeys.isEmpty()) {
+          // This sections simulate a KV Row
+          // Similar to the output of Group.byFieldIds
+          // When no partitions are specified
+          Schema inputSch = inputData.getSchema();

Review comment:
       The mock key generation here could be simplified by, something like
   
   ```
                 windowedStream
                     .apply(WithKeys.of("dummy"))
                     .apply(GroupByKey.create())
   ```
   
   note that maybe do not for many schema manipulation.  

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending

Review comment:
       Same

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending

Review comment:
       Same

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {

Review comment:
       Add a function java doc to describe your choice of cost and why.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamWindowRule.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rule;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamWindowRel;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalWindow;
+
+public class BeamWindowRule extends ConverterRule {

Review comment:
       Class java doc will be very helpful.

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+
+public class BeamWindowRel extends Window implements BeamRelNode {
+  public BeamWindowRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelNode input,
+      List<RexLiteral> constants,
+      RelDataType rowType,
+      List<Group> groups) {
+    super(cluster, traitSet, input, constants, rowType, groups);
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    Schema outputSchema = CalciteUtils.toSchema(getRowType());
+    final List<FieldAggregation> analyticFields = Lists.newArrayList();
+    this.groups.stream()
+        .forEach(
+            anAnalyticGroup -> {
+              List<Integer> partitionKeysDef = anAnalyticGroup.keys.toList();
+              List<Integer> orderByKeys = Lists.newArrayList();
+              List<Boolean> orderByDirections = Lists.newArrayList();
+              List<Boolean> orderByNullDirections = Lists.newArrayList();
+              anAnalyticGroup.orderKeys.getFieldCollations().stream()
+                  .forEach(
+                      fc -> {
+                        orderByKeys.add(fc.getFieldIndex());
+                        orderByDirections.add(
+                            fc.direction == RelFieldCollation.Direction.ASCENDING);
+                        orderByNullDirections.add(
+                            fc.nullDirection == RelFieldCollation.NullDirection.FIRST);
+                      });
+              int lowerB = Integer.MAX_VALUE; // Unbounded by default
+              int upperB = Integer.MAX_VALUE; // Unbounded by default
+              if (anAnalyticGroup.lowerBound.isCurrentRow()) {
+                lowerB = 0;
+              } else if (anAnalyticGroup.lowerBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.lowerBound.isFollowing()) {
+                // pending
+              }
+              if (anAnalyticGroup.upperBound.isCurrentRow()) {
+                upperB = 0;
+              } else if (anAnalyticGroup.upperBound.isPreceding()) {
+                // pending
+              } else if (anAnalyticGroup.upperBound.isFollowing()) {
+                // pending
+              }
+              final int lowerBFinal = lowerB;
+              final int upperBFinal = upperB;
+              List<AggregateCall> aggregateCalls = anAnalyticGroup.getAggregateCalls(this);
+              aggregateCalls.stream()
+                  .forEach(
+                      anAggCall -> {
+                        List<Integer> argList = anAggCall.getArgList();
+                        Schema.Field field =
+                            CalciteUtils.toField(anAggCall.getName(), anAggCall.getType());
+                        Combine.CombineFn combineFn =
+                            AggregationCombineFnAdapter.createCombineFn(
+                                anAggCall, field, anAggCall.getAggregation().getName());
+                        FieldAggregation fieldAggregation =
+                            new FieldAggregation(
+                                partitionKeysDef,
+                                orderByKeys,
+                                orderByDirections,
+                                orderByNullDirections,
+                                lowerBFinal,
+                                upperBFinal,
+                                anAnalyticGroup.isRows,
+                                argList,
+                                combineFn,
+                                field);
+                        analyticFields.add(fieldAggregation);
+                      });
+            });
+
+    return new Transform(outputSchema, analyticFields);
+  }
+
+  private static class FieldAggregation implements Serializable {
+
+    private List<Integer> partitionKeys;
+    private List<Integer> orderKeys;
+    private List<Boolean> orderOrientations;
+    private List<Boolean> orderNulls;
+    private int lowerLimit = Integer.MAX_VALUE;
+    private int upperLimit = Integer.MAX_VALUE;
+    private boolean rows = true;
+    private List<Integer> inputFields;
+    private Combine.CombineFn combineFn;
+    private Schema.Field outputField;
+
+    public FieldAggregation(
+        List<Integer> partitionKeys,
+        List<Integer> orderKeys,
+        List<Boolean> orderOrientations,
+        List<Boolean> orderNulls,
+        int lowerLimit,
+        int upperLimit,
+        boolean rows,
+        List<Integer> inputFields,
+        Combine.CombineFn combineFn,
+        Schema.Field outputField) {
+      this.partitionKeys = partitionKeys;
+      this.orderKeys = orderKeys;
+      this.orderOrientations = orderOrientations;
+      this.orderNulls = orderNulls;
+      this.lowerLimit = lowerLimit;
+      this.upperLimit = upperLimit;
+      this.rows = rows;
+      this.inputFields = inputFields;
+      this.combineFn = combineFn;
+      this.outputField = outputField;
+    }
+  }
+
+  @Override
+  public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    return inputStat;
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
+    float multiplier = 1f + 0.125f;
+    return BeamCostModel.FACTORY.makeCost(
+        inputStat.getRowCount() * multiplier, inputStat.getRate() * multiplier);
+  }
+
+  private static class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    private Schema outputSchema;
+    private List<FieldAggregation> aggFields;
+
+    public Transform(Schema schema, List<FieldAggregation> fieldAgg) {
+      this.outputSchema = schema;
+      this.aggFields = fieldAgg;
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      PCollection<Row> inputData = input.get(0);
+      Schema inputSchema = inputData.getSchema();
+      for (FieldAggregation af : aggFields) {
+        if (af.partitionKeys.isEmpty()) {
+          // This sections simulate a KV Row
+          // Similar to the output of Group.byFieldIds
+          // When no partitions are specified
+          Schema inputSch = inputData.getSchema();
+          Schema mockKeySchema =
+              Schema.of(Schema.Field.of("mock", Schema.FieldType.STRING.withNullable(true)));
+          Schema simulatedKeyValueSchema =
+              Schema.of(
+                  Schema.Field.of("key", Schema.FieldType.row(mockKeySchema)),
+                  Schema.Field.of(
+                      "value", Schema.FieldType.iterable(Schema.FieldType.row(inputSch))));
+          PCollection<Iterable<Row>> apply =
+              inputData.apply(org.apache.beam.sdk.schemas.transforms.Group.globally());
+          inputData =
+              apply
+                  .apply(ParDo.of(uniquePartition(mockKeySchema, simulatedKeyValueSchema)))
+                  .setRowSchema(simulatedKeyValueSchema);
+        } else {
+          org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> myg =
+              org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(af.partitionKeys);
+          inputData = inputData.apply("partitionBy", myg);
+        }
+        inputData =

Review comment:
       Can you skip sort transform when there is no order by? Also leave a comment to say migrate to SortValues transform in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia merged pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia merged pull request #11975:
URL: https://github.com/apache/beam/pull/11975


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r444547096



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {
+
+  /**
+   * Table schema and data taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#produce_table
+   *
+   * <p>Compute a cumulative sum query taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#compute_a_cumulative_sum
+   */
+  @Test
+  public void testOverCumulativeSum() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    Schema schema =
+        Schema.builder()
+            .addStringField("item")
+            .addStringField("category")
+            .addInt32Field("purchases")
+            .build();
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            "kale",
+                            "vegetable",
+                            23,
+                            "orange",
+                            "fruit",
+                            2,
+                            "cabbage",
+                            "vegetable",
+                            9,
+                            "apple",
+                            "fruit",
+                            8,
+                            "leek",
+                            "vegetable",
+                            2,
+                            "lettuce",
+                            "vegetable",
+                            10)
+                        .getRows()))
+            .setRowSchema(schema);
+    String sql =
+        "SELECT item, purchases, category, sum(purchases) over "
+            + "(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"
+            + " as total_purchases  FROM PCOLLECTION";

Review comment:
       Thanks. Support PartitionBy and OrderBy is huge!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] jhnmora000 commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
jhnmora000 commented on a change in pull request #11975:
URL: https://github.com/apache/beam/pull/11975#discussion_r442376683



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * A simple Analytic Functions experiment for BeamSQL created in order to understand the query
+ * processing workflow of BeamSQL and Calcite.
+ */
+public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase {
+
+  /**
+   * Table schema and data taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#produce_table
+   *
+   * <p>Compute a cumulative sum query taken from
+   * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#compute_a_cumulative_sum
+   */
+  @Test
+  public void testOverCumulativeSum() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    Schema schema =
+        Schema.builder()
+            .addStringField("item")
+            .addStringField("category")
+            .addInt32Field("purchases")
+            .build();
+    PCollection<Row> inputRows =
+        pipeline
+            .apply(
+                Create.of(
+                    TestUtils.rowsBuilderOf(schema)
+                        .addRows(
+                            "kale",
+                            "vegetable",
+                            23,
+                            "orange",
+                            "fruit",
+                            2,
+                            "cabbage",
+                            "vegetable",
+                            9,
+                            "apple",
+                            "fruit",
+                            8,
+                            "leek",
+                            "vegetable",
+                            2,
+                            "lettuce",
+                            "vegetable",
+                            10)
+                        .getRows()))
+            .setRowSchema(schema);
+    String sql =
+        "SELECT item, purchases, category, sum(purchases) over "
+            + "(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)"
+            + " as total_purchases  FROM PCOLLECTION";

Review comment:
       `LogicalProject(item=[$0], purchases=[$2], category=[$1], total_purchases=[SUM($2) OVER (PARTITION BY $1 ORDER BY $2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
     BeamIOSourceRel(table=[[beam, PCOLLECTION]])`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-642300874


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-658451009


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia edited a comment on pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #11975:
URL: https://github.com/apache/beam/pull/11975#issuecomment-647224221


   @jhnmora000 
   
   You can run `./gradlew check" to check style violations.  
   
   `check` command also run unit tests I believe. So It might be better to only run `check` for SQL module.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org