You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/06/24 15:28:36 UTC

[GitHub] [beam] Mark-Zeng opened a new pull request #12073: [Beam 9543] Support Match Recognition in Beam SQL

Mark-Zeng opened a new pull request #12073:
URL: https://github.com/apache/beam/pull/12073


   This is a working implementation of `Match_Recognize` in Beam SQL.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark
   --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #12073:
URL: https://github.com/apache/beam/pull/12073#issuecomment-648953111


   You can run `./gradlew :core:check` to include style check. Some style check can be fixed by this tool `./gradlew spotlessApply`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445150359



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##########
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+    Schema schemaType = Schema.builder()
+        .addInt32Field("id")
+        .addStringField("name")
+        .addInt32Field("proctime")
+        .build();
+
+    PCollection<Row> input =
+        pipeline.apply(
+            Create.of(
+                Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+                .withRowSchema(schemaType));
+
+    String sql = "SELECT T.aid, T.bid, T.cid " +
+        "FROM PCOLLECTION " +
+        "MATCH_RECOGNIZE (" +
+        "PARTITION BY id " +
+        "ORDER BY proctime " +
+        "MEASURES " +
+        "A.id AS aid, " +
+        "B.id AS bid, " +
+        "C.id AS cid " +
+        "PATTERN (A B C) " +
+        "DEFINE " +
+        "A AS name = 'a', " +
+        "B AS name = 'b', " +
+        "C AS name = 'c' " +
+        ") AS T";
+
+    PCollection<Row> result = input.apply(SqlTransform.query(sql));

Review comment:
       I see. Yes I think this test is valid. It tests if the query can be compiled, which includes the BeamMatchRel.

##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##########
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+    Schema schemaType = Schema.builder()
+        .addInt32Field("id")
+        .addStringField("name")
+        .addInt32Field("proctime")
+        .build();
+
+    PCollection<Row> input =
+        pipeline.apply(
+            Create.of(
+                Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+                .withRowSchema(schemaType));
+
+    String sql = "SELECT T.aid, T.bid, T.cid " +
+        "FROM PCOLLECTION " +
+        "MATCH_RECOGNIZE (" +
+        "PARTITION BY id " +
+        "ORDER BY proctime " +
+        "MEASURES " +
+        "A.id AS aid, " +
+        "B.id AS bid, " +
+        "C.id AS cid " +
+        "PATTERN (A B C) " +
+        "DEFINE " +
+        "A AS name = 'a', " +
+        "B AS name = 'b', " +
+        "C AS name = 'c' " +
+        ") AS T";
+
+    PCollection<Row> result = input.apply(SqlTransform.query(sql));

Review comment:
       I see. Yes I think this test is valid. It tests if the query can be compiled, which covers the BeamMatchRel.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445150359



##########
File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java
##########
@@ -0,0 +1,50 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.apache.beam.sdk.schemas.Schema;
+
+public class BeamMatchRelTest {
+
+  public static final TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void MatchLogicalPlanTest() {
+    Schema schemaType = Schema.builder()
+        .addInt32Field("id")
+        .addStringField("name")
+        .addInt32Field("proctime")
+        .build();
+
+    PCollection<Row> input =
+        pipeline.apply(
+            Create.of(
+                Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build())
+                .withRowSchema(schemaType));
+
+    String sql = "SELECT T.aid, T.bid, T.cid " +
+        "FROM PCOLLECTION " +
+        "MATCH_RECOGNIZE (" +
+        "PARTITION BY id " +
+        "ORDER BY proctime " +
+        "MEASURES " +
+        "A.id AS aid, " +
+        "B.id AS bid, " +
+        "C.id AS cid " +
+        "PATTERN (A B C) " +
+        "DEFINE " +
+        "A AS name = 'a', " +
+        "B AS name = 'b', " +
+        "C AS name = 'c' " +
+        ") AS T";
+
+    PCollection<Row> result = input.apply(SqlTransform.query(sql));

Review comment:
       I see. Yes I think this test is valid. It tests if the query can be compile, which includes the BeamMatchRel.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #12073:
URL: https://github.com/apache/beam/pull/12073#issuecomment-649841664


   > cc @aaltay
   
   Nice, thank you for cc'ing me.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445149049



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java
##########
@@ -0,0 +1,279 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.util.*;
+
+import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
+
+/** {@link BeamRelNode} to replace a {@link Match} node. */
+public class BeamMatchRel extends Match implements BeamRelNode {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BeamMatchRel.class);
+
+    public BeamMatchRel(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        RelDataType rowType,
+        RexNode pattern,
+        boolean strictStart,
+        boolean strictEnd,
+        Map<String, RexNode> patternDefinitions,
+        Map<String, RexNode> measures,
+        RexNode after,
+        Map<String, ? extends SortedSet<String>> subsets,
+        boolean allRows,
+        List<RexNode> partitionKeys,
+        RelCollation orderKeys,
+        RexNode interval) {
+
+        super(cluster,
+            traitSet,
+            input,
+            rowType,
+            pattern,
+            strictStart,
+            strictEnd,
+            patternDefinitions,
+            measures,
+            after,
+            subsets,
+            allRows,
+            partitionKeys,
+            orderKeys,
+            interval);
+
+    }
+
+    @Override
+    public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+        return BeamCostModel.FACTORY.makeTinyCost(); // return constant costModel for now
+    }
+
+    @Override
+    public NodeStats estimateNodeStats(RelMetadataQuery mq) {
+        // a simple way of getting some estimate data
+        // to be examined further
+        NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(input, mq);
+        double numRows = inputEstimate.getRowCount();
+        double winSize = inputEstimate.getWindow();
+        double rate = inputEstimate.getRate();
+
+        return NodeStats.create(numRows, rate, winSize).multiply(0.5);
+    }
+
+    @Override
+    public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+
+        return new matchTransform(partitionKeys, orderKeys);
+    }
+
+    private static class matchTransform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+        private final List<RexNode> parKeys;
+        private final RelCollation orderKeys;
+
+        public matchTransform(List<RexNode> parKeys, RelCollation orderKeys) {
+            this.parKeys = parKeys;
+            this.orderKeys = orderKeys;
+        }
+
+        @Override
+        public PCollection<Row> expand(PCollectionList<Row> pinput) {
+            checkArgument(
+                pinput.size() == 1,
+                "Wrong number of inputs for %s: %s",
+                BeamMatchRel.class.getSimpleName(),
+                pinput);
+            PCollection<Row> upstream = pinput.get(0);
+
+            Schema collectionSchema = upstream.getSchema();
+
+            Schema.Builder schemaBuilder = new Schema.Builder();
+            for (RexNode i : parKeys) {
+                RexVariable varNode = (RexVariable) i;
+                int index = Integer.parseInt(varNode.getName().substring(1)); // get rid of `$`
+                schemaBuilder.addField(collectionSchema.getField(index));
+            }
+            Schema mySchema = schemaBuilder.build();
+
+            // partition according to the partition keys
+            PCollection<KV<Row, Row>> keyedUpstream = upstream
+                .apply(ParDo.of(new MapKeys(mySchema)));
+
+            // sort within each partition
+            PCollection<KV<Row, ArrayList<Row>>> orderedUpstream = keyedUpstream
+                .apply(Combine.<Row, Row, ArrayList<Row>>perKey(new SortParts(mySchema, orderKeys)));
+
+            return null;
+        }
+
+        private static class SortParts extends Combine.CombineFn<Row, ArrayList<Row>, ArrayList<Row>> {
+
+            private final Schema mySchema;
+            private final List<RelFieldCollation> orderKeys;
+
+            public SortParts(Schema mySchema, RelCollation orderKeys) {
+                this.mySchema = mySchema;
+                List<RelFieldCollation> revOrderKeys = orderKeys.getFieldCollations();
+                Collections.reverse(revOrderKeys);
+                this.orderKeys = revOrderKeys;
+            }
+
+            @Override
+            public ArrayList<Row> createAccumulator() {
+                return new ArrayList<Row>();
+            }
+
+            @Override
+            public ArrayList<Row> addInput(ArrayList<Row> Accum, Row inRow) {
+                Accum.add(inRow);
+                return Accum;
+            }
+
+            @Override
+            public ArrayList<Row> mergeAccumulators(Iterable<ArrayList<Row>> Accums) {
+                ArrayList<Row> aggAccum = new ArrayList<Row>();
+                for (ArrayList<Row> i : Accums) {
+                    aggAccum.addAll(i);
+                }
+                return aggAccum;
+            }
+
+            @Override
+            public ArrayList<Row> extractOutput(ArrayList<Row> rawRows) {
+                for (RelFieldCollation i : orderKeys) {
+                    int fIndex = i.getFieldIndex();
+                    RelFieldCollation.Direction dir = i.getDirection();
+                    if (dir == RelFieldCollation.Direction.ASCENDING) {
+                        Collections.sort(rawRows, new sortComparator(fIndex, true));
+                    }
+                }
+                return rawRows;
+            }
+
+            private class sortComparator implements Comparator<Row> {
+
+                private final int fIndex;
+                private final int inv;
+
+                public sortComparator(int fIndex, boolean inverse) {
+                    this.fIndex = fIndex;
+                    this.inv = inverse ? -1 : 1;
+                }
+
+                @Override
+                public int compare(Row o1, Row o2) {
+                    Schema.Field fd = mySchema.getField(fIndex);
+                    Schema.FieldType dtype = fd.getType();
+                    switch (dtype.getTypeName()) {

Review comment:
       There is a Row comparator: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java#L373




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #12073:
URL: https://github.com/apache/beam/pull/12073#issuecomment-649815316


   cc @aaltay 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Mark-Zeng commented on a change in pull request #12073: [Beam-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
Mark-Zeng commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r444983825



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -20,25 +20,7 @@
 import java.util.List;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregateProjectMergeRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOPushDownRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinAssociateRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinPushThroughJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamTableFunctionScanRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUncollectRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnnestRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamValuesRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.*;

Review comment:
       This might be some default packing behavior of my Intellij idea. I didn't realize this happened; will unpack the import statement in later commits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Mark-Zeng closed pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
Mark-Zeng closed pull request #12073:
URL: https://github.com/apache/beam/pull/12073


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445147153



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java
##########
@@ -0,0 +1,279 @@
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Array;
+import java.util.*;

Review comment:
       Same. Import class by class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #12073:
URL: https://github.com/apache/beam/pull/12073#discussion_r445051086



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -20,25 +20,7 @@
 import java.util.List;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregateProjectMergeRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCoGBKJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamEnumerableConverterRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOPushDownRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIntersectRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinAssociateRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamJoinPushThroughJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamMinusRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSideInputLookupJoinRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamSortRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamTableFunctionScanRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUncollectRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnionRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnnestRule;
-import org.apache.beam.sdk.extensions.sql.impl.rule.BeamValuesRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.*;

Review comment:
       Yes. Intellij has a setting to control when using `*`: https://stackoverflow.com/questions/3348816/intellij-never-use-wildcard-imports/25916603




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Mark-Zeng commented on pull request #12073: [Beam-9543] Support Match Recognition in Beam SQL

Posted by GitBox <gi...@apache.org>.
Mark-Zeng commented on pull request #12073:
URL: https://github.com/apache/beam/pull/12073#issuecomment-648892488


   R: @amaliujia 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org