You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/06/13 12:41:10 UTC
[1/2] beam git commit: [BEAM-2325] Support Set operator: intersect &
except
Repository: beam
Updated Branches:
refs/heads/DSL_SQL c0171593b -> 315f266a6
[BEAM-2325] Support Set operator: intersect & except
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81d699e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81d699e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81d699e4
Branch: refs/heads/DSL_SQL
Commit: 81d699e4069856827bf33782c024671b48578bf4
Parents: c017159
Author: James Xu <xu...@gmail.com>
Authored: Fri May 19 21:47:10 2017 +0800
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jun 13 14:13:58 2017 +0200
----------------------------------------------------------------------
.../beam/dsls/sql/planner/BeamRuleSets.java | 7 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 15 ++-
.../beam/dsls/sql/rel/BeamIntersectRel.java | 58 +++++++++
.../apache/beam/dsls/sql/rel/BeamMinusRel.java | 56 +++++++++
.../dsls/sql/rel/BeamSetOperatorRelBase.java | 99 +++++++++++++++
.../apache/beam/dsls/sql/rel/BeamUnionRel.java | 88 +++++++++++++
.../beam/dsls/sql/rule/BeamIntersectRule.java | 51 ++++++++
.../beam/dsls/sql/rule/BeamMinusRule.java | 51 ++++++++
.../beam/dsls/sql/rule/BeamUnionRule.java | 50 ++++++++
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 3 +-
.../transform/BeamSetOperatorsTransforms.java | 113 +++++++++++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 6 +-
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 111 +++++++++++++++++
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 110 +++++++++++++++++
.../sql/rel/BeamSetOperatorRelBaseTest.java | 122 +++++++++++++++++++
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 99 +++++++++++++++
.../org/apache/beam/dsls/sql/rel/CheckSize.java | 41 +++++++
17 files changed, 1069 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 1ad62bc..6c73558 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.planner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+
import java.util.Iterator;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -26,8 +27,11 @@ import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
import org.apache.beam.dsls.sql.rule.BeamFilterRule;
import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
+import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
+import org.apache.beam.dsls.sql.rule.BeamMinusRule;
import org.apache.beam.dsls.sql.rule.BeamProjectRule;
import org.apache.beam.dsls.sql.rule.BeamSortRule;
+import org.apache.beam.dsls.sql.rule.BeamUnionRule;
import org.apache.beam.dsls.sql.rule.BeamValuesRule;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.rel.RelNode;
@@ -42,7 +46,8 @@ public class BeamRuleSets {
private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
.<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
- BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE)
+ BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
+ BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE)
.build();
public static RuleSet[] getRuleSets() {
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index c0d2783..9951536 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -18,6 +18,7 @@
package org.apache.beam.dsls.sql.rel;
import java.util.List;
+
import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -79,37 +80,39 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
PCollection<BeamSqlRow> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
if (windowFieldIdx != -1) {
- upstream = upstream.apply("assignEventTimestamp", WithTimestamps
+ upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
.<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
.setCoder(upstream.getCoder());
}
- PCollection<BeamSqlRow> windowStream = upstream.apply("window",
+ PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window",
Window.<BeamSqlRow>into(windowFn)
.triggering(trigger)
.withAllowedLateness(allowedLatence)
.accumulatingFiredPanes());
BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
- PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply("exGroupBy",
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply(
+ stageName + "_exGroupBy",
WithKeys
.of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
windowFieldIdx, groupSet)))
.setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder()));
PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
- .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
+ .apply(stageName + "_groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
.setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
IterableCoder.<BeamSqlRow>of(upstream.getCoder())));
BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
- PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply(
+ stageName + "_aggregation",
Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
BeamSqlRecordType.from(input.getRowType()))))
.setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
+ PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
BeamSqlRecordType.from(getRowType()), getAggCallList())));
mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
new file mode 100644
index 0000000..01e1c33
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Intersect} node.
+ *
+ * <p>This is used to combine two SELECT statements, but returns rows only from the
+ * first SELECT statement that are identical to a row in the second SELECT statement.
+ */
+public class BeamIntersectRel extends Intersect implements BeamRelNode {
+ private BeamSetOperatorRelBase delegate;
+ public BeamIntersectRel(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+ throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
new file mode 100644
index 0000000..bee6c11
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Minus} node.
+ *
+ * <p>Corresponds to the SQL {@code EXCEPT} operator.
+ */
+public class BeamMinusRel extends Minus implements BeamRelNode {
+
+ private BeamSetOperatorRelBase delegate;
+
+ public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamMinusRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+ throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
new file mode 100644
index 0000000..271e98f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
+ * and {@code BeamMinusRel}.
+ */
+public class BeamSetOperatorRelBase {
+ /**
+ * Set operator type.
+ */
+ public enum OpType implements Serializable {
+ UNION,
+ INTERSECT,
+ MINUS
+ }
+
+ private BeamRelNode beamRelNode;
+ private List<RelNode> inputs;
+ private boolean all;
+ private OpType opType;
+
+ public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
+ List<RelNode> inputs, boolean all) {
+ this.beamRelNode = beamRelNode;
+ this.opType = opType;
+ this.inputs = inputs;
+ this.all = all;
+ }
+
+ public PCollection<BeamSqlRow> buildBeamPipeline(
+ PCollectionTuple inputPCollections) throws Exception {
+ PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+ .buildBeamPipeline(inputPCollections);
+ PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+ .buildBeamPipeline(inputPCollections);
+
+ WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
+ if (!leftWindow.isCompatible(rightWindow)) {
+ throw new IllegalArgumentException(
+ "inputs of " + opType + " have different window strategy: "
+ + leftWindow + " VS " + rightWindow);
+ }
+
+ final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
+ final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+
+ // co-group
+ String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
+ PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+ .of(leftTag, leftRows.apply(
+ stageName + "_CreateLeftIndex", MapElements.via(
+ new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+ .and(rightTag, rightRows.apply(
+ stageName + "_CreateRightIndex", MapElements.via(
+ new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+ .apply(CoGroupByKey.<BeamSqlRow>create());
+ PCollection<BeamSqlRow> ret = coGbkResultCollection
+ .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
+ opType, all)));
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
new file mode 100644
index 0000000..63cf11a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Union}.
+ *
+ * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
+ * perspective, two cases are supported:
+ *
+ * <p>1) Do not use {@code grouped window function}:
+ *
+ * <pre>{@code
+ * select * from person UNION select * from person
+ * }</pre>
+ *
+ * <p>2) Use the same {@code grouped window function}, with the same param:
+ * <pre>{@code
+ * select id, count(*) from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * UNION
+ * select * from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * }</pre>
+ *
+ * <p>Inputs with different group functions are NOT supported:
+ * <pre>{@code
+ * select id, count(*) from person
+ * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * UNION
+ * select * from person
+ * group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
+ * }</pre>
+ */
+public class BeamUnionRel extends Union implements BeamRelNode {
+ private BeamSetOperatorRelBase delegate;
+ public BeamUnionRel(RelOptCluster cluster,
+ RelTraitSet traits,
+ List<RelNode> inputs,
+ boolean all) {
+ super(cluster, traits, inputs, all);
+ this.delegate = new BeamSetOperatorRelBase(this,
+ BeamSetOperatorRelBase.OpType.UNION,
+ inputs, all);
+ }
+
+ public BeamUnionRel(RelInput input) {
+ super(input);
+ }
+
+ @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+ return new BeamUnionRel(getCluster(), traitSet, inputs, all);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+ throws Exception {
+ return delegate.buildBeamPipeline(inputPCollections);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
new file mode 100644
index 0000000..70716c5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamIntersectRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+
+/**
+ * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRule extends ConverterRule {
+ public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
+ private BeamIntersectRule() {
+ super(LogicalIntersect.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Intersect intersect = (Intersect) rel;
+ final List<RelNode> inputs = intersect.getInputs();
+ return new BeamIntersectRel(
+ intersect.getCluster(),
+ intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ intersect.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
new file mode 100644
index 0000000..ca93c71
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamMinusRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+/**
+ * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
+ */
+public class BeamMinusRule extends ConverterRule {
+ public static final BeamMinusRule INSTANCE = new BeamMinusRule();
+ private BeamMinusRule() {
+ super(LogicalMinus.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamMinusRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Minus minus = (Minus) rel;
+ final List<RelNode> inputs = minus.getInputs();
+ return new BeamMinusRel(
+ minus.getCluster(),
+ minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(inputs, BeamLogicalConvention.INSTANCE),
+ minus.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
new file mode 100644
index 0000000..b8430b9
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamUnionRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+
+/**
+ * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
+ * {@link BeamUnionRule}.
+ */
+public class BeamUnionRule extends ConverterRule {
+ public static final BeamUnionRule INSTANCE = new BeamUnionRule();
+ private BeamUnionRule() {
+ super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+ "BeamUnionRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Union union = (Union) rel;
+
+ return new BeamUnionRel(
+ union.getCluster(),
+ union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
+ union.all
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index f885aaf..a7e9f4b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -32,11 +32,10 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.Instant;
/**
- * Repersent a generic ROW record in Beam SQL.
+ * Represent a generic ROW record in Beam SQL.
*
*/
public class BeamSqlRow implements Serializable {
-
private List<Integer> nullFields = new ArrayList<>();
private List<Object> dataValues;
private BeamSqlRecordType dataType;
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
new file mode 100644
index 0000000..56b3e14
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.Iterator;
+
+import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
+ */
+public abstract class BeamSetOperatorsTransforms {
+ /**
+ * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
+ */
+ public static class BeamSqlRow2KvFn extends
+ SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+ @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ return KV.of(input, input);
+ }
+ }
+
+ /**
+ * Filter function used for Set operators.
+ */
+ public static class SetOperatorFilteringDoFn extends
+ DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
+ private TupleTag<BeamSqlRow> leftTag;
+ private TupleTag<BeamSqlRow> rightTag;
+ private BeamSetOperatorRelBase.OpType opType;
+ // ALL?
+ private boolean all;
+
+ public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+ BeamSetOperatorRelBase.OpType opType, boolean all) {
+ this.leftTag = leftTag;
+ this.rightTag = rightTag;
+ this.opType = opType;
+ this.all = all;
+ }
+
+ @ProcessElement public void processElement(ProcessContext ctx) {
+ CoGbkResult coGbkResult = ctx.element().getValue();
+ Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
+ Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+ switch (opType) {
+ case UNION:
+ if (all) {
+ // output both left & right
+ Iterator<BeamSqlRow> iter = leftRows.iterator();
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ iter = rightRows.iterator();
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ } else {
+ // only output the key
+ ctx.output(ctx.element().getKey());
+ }
+ break;
+ case INTERSECT:
+ if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
+ if (all) {
+ Iterator<BeamSqlRow> iter = leftRows.iterator();
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ } else {
+ ctx.output(ctx.element().getKey());
+ }
+ }
+ break;
+ case MINUS:
+ if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
+ Iterator<BeamSqlRow> iter = leftRows.iterator();
+ if (all) {
+ // output all
+ while (iter.hasNext()) {
+ ctx.output(iter.next());
+ }
+ } else {
+ // only output one
+ ctx.output(iter.next());
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index 2ff042d..185e95a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -20,6 +20,7 @@ package org.apache.beam.dsls.sql.planner;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
@@ -43,7 +44,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
*
*/
public class MockedBeamSqlTable extends BaseBeamTable {
-
+ public static final AtomicInteger COUNTER = new AtomicInteger();
public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
private List<BeamSqlRow> inputRecords;
@@ -122,7 +123,8 @@ public class MockedBeamSqlTable extends BaseBeamTable {
@Override
public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply(Create.of(inputRecords));
+ return PBegin.in(pipeline).apply(
+ "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
new file mode 100644
index 0000000..02223c2
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRelTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 4L, 4, 4.0
+ );
+
+ private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 3L, 3, 3.0
+ );
+
+ @BeforeClass
+ public static void setUp() {
+ BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+ BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ }
+
+ @Test
+ public void testIntersect() throws Exception {
+ String sql = "";
+ sql += "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS1 "
+ + " INTERSECT "
+ + "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS2 ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+
+ 1L, 1, 1.0,
+ 2L, 2, 2.0
+ ).getInputRecords());
+
+ pipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testIntersectAll() throws Exception {
+ String sql = "";
+ sql += "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS1 "
+ + " INTERSECT ALL "
+ + "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS2 ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).satisfies(new CheckSize(3));
+
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0
+ ).getInputRecords());
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
new file mode 100644
index 0000000..cd6ba16
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamMinusRel}.
+ */
+public class BeamMinusRelTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 4L, 4, 4.0,
+ 4L, 4, 4.0
+ );
+
+ private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 3L, 3, 3.0
+ );
+
+ @Before
+ public void setUp() {
+ BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+ BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+ MockedBeamSqlTable.CONTENT.clear();
+ }
+
+ @Test
+ public void testExcept() throws Exception {
+ String sql = "";
+ sql += "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS1 "
+ + " EXCEPT "
+ + "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS2 ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 4L, 4, 4.0
+ ).getInputRecords());
+
+ pipeline.run();
+ }
+
+ @Test
+ public void testExceptAll() throws Exception {
+ String sql = "";
+ sql += "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS1 "
+ + " EXCEPT ALL "
+ + "SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS2 ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).satisfies(new CheckSize(2));
+
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ 4L, 4, 4.0,
+ 4L, 4, 4.0
+ ).getInputRecords());
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
new file mode 100644
index 0000000..4936062
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSetOperatorRelBase}.
+ */
+public class BeamSetOperatorRelBaseTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ public static final Date THE_DATE = new Date();
+ private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+ SqlTypeName.TIMESTAMP, "order_time",
+
+ 1L, 1, 1.0, THE_DATE,
+ 2L, 2, 2.0, THE_DATE);
+
+ @BeforeClass
+ public static void prepare() {
+ THE_DATE.setTime(100000);
+ BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ }
+
+ @Test
+ public void testSameWindow() throws Exception {
+ String sql = "SELECT "
+ + " order_id, site_id, count(*) as cnt "
+ + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+ + " UNION SELECT "
+ + " order_id, site_id, count(*) as cnt "
+ + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ List<BeamSqlRow> expRows =
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.BIGINT, "cnt",
+
+ 1L, 1, 1L,
+ 2L, 2, 1L
+ ).getInputRecords();
+ // compare valueInString to ignore the windowStart & windowEnd
+ PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows));
+ pipeline.run();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDifferentWindows() throws Exception {
+ String sql = "SELECT "
+ + " order_id, site_id, count(*) as cnt "
+ + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+ + " UNION SELECT "
+ + " order_id, site_id, count(*) as cnt "
+ + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+ + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
+
+ // use a real pipeline rather than the TestPipeline because we are
+ // testing exceptions, the pipeline will not actually run.
+ Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
+ BeamSqlCli.compilePipeline(sql, pipeline1);
+ pipeline.run();
+ }
+
+ static class ToString extends DoFn<BeamSqlRow, String> {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().valueInString());
+ }
+ }
+
+ static List<String> toString (List<BeamSqlRow> rows) {
+ List<String> strs = new ArrayList<>();
+ for (BeamSqlRow row : rows) {
+ strs.add(row.valueInString());
+ }
+
+ return strs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
new file mode 100644
index 0000000..c2a0597
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamUnionRel}.
+ */
+public class BeamUnionRelTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
+ .of(SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+
+ 1L, 1, 1.0,
+ 2L, 2, 2.0);
+
+ @BeforeClass
+ public static void prepare() {
+ BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ String sql = "SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS "
+ + " UNION SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS ";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+
+ 1L, 1, 1.0,
+ 2L, 2, 2.0
+ ).getInputRecords()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testUnionAll() throws Exception {
+ String sql = "SELECT "
+ + " order_id, site_id, price "
+ + "FROM ORDER_DETAILS"
+ + " UNION ALL "
+ + " SELECT order_id, site_id, price "
+ + "FROM ORDER_DETAILS";
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+ PAssert.that(rows).containsInAnyOrder(
+ MockedBeamSqlTable.of(
+ SqlTypeName.BIGINT, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.DOUBLE, "price",
+
+ 1L, 1, 1.0,
+ 1L, 1, 1.0,
+ 2L, 2, 2.0,
+ 2L, 2, 2.0
+ ).getInputRecords()
+ );
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
new file mode 100644
index 0000000..ce532df
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Assert;
+
+/**
+ * Utility class to check size of BeamSQLRow iterable.
+ */
+public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+ private int size;
+ public CheckSize(int size) {
+ this.size = size;
+ }
+ @Override public Void apply(Iterable<BeamSqlRow> input) {
+ int count = 0;
+ for (BeamSqlRow row : input) {
+ count++;
+ }
+ Assert.assertEquals(size, count);
+ return null;
+ }
+}
[2/2] beam git commit: [BEAM-2325] This closes #3183
Posted by jb...@apache.org.
[BEAM-2325] This closes #3183
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/315f266a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/315f266a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/315f266a
Branch: refs/heads/DSL_SQL
Commit: 315f266a6e24cd851a98cba320635c9673ba3dac
Parents: c017159 81d699e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Jun 13 14:40:56 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jun 13 14:40:56 2017 +0200
----------------------------------------------------------------------
.../beam/dsls/sql/planner/BeamRuleSets.java | 7 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 15 ++-
.../beam/dsls/sql/rel/BeamIntersectRel.java | 58 +++++++++
.../apache/beam/dsls/sql/rel/BeamMinusRel.java | 56 +++++++++
.../dsls/sql/rel/BeamSetOperatorRelBase.java | 99 +++++++++++++++
.../apache/beam/dsls/sql/rel/BeamUnionRel.java | 88 +++++++++++++
.../beam/dsls/sql/rule/BeamIntersectRule.java | 51 ++++++++
.../beam/dsls/sql/rule/BeamMinusRule.java | 51 ++++++++
.../beam/dsls/sql/rule/BeamUnionRule.java | 50 ++++++++
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 3 +-
.../transform/BeamSetOperatorsTransforms.java | 113 +++++++++++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 6 +-
.../beam/dsls/sql/rel/BeamIntersectRelTest.java | 111 +++++++++++++++++
.../beam/dsls/sql/rel/BeamMinusRelTest.java | 110 +++++++++++++++++
.../sql/rel/BeamSetOperatorRelBaseTest.java | 122 +++++++++++++++++++
.../beam/dsls/sql/rel/BeamUnionRelTest.java | 99 +++++++++++++++
.../org/apache/beam/dsls/sql/rel/CheckSize.java | 41 +++++++
17 files changed, 1069 insertions(+), 11 deletions(-)
----------------------------------------------------------------------