You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/06/13 12:41:10 UTC

[1/2] beam git commit: [BEAM-2325] Support Set operator: intersect & except

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL c0171593b -> 315f266a6


[BEAM-2325] Support Set operator: intersect & except


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/81d699e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/81d699e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/81d699e4

Branch: refs/heads/DSL_SQL
Commit: 81d699e4069856827bf33782c024671b48578bf4
Parents: c017159
Author: James Xu <xu...@gmail.com>
Authored: Fri May 19 21:47:10 2017 +0800
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jun 13 14:13:58 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   7 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  15 ++-
 .../beam/dsls/sql/rel/BeamIntersectRel.java     |  58 +++++++++
 .../apache/beam/dsls/sql/rel/BeamMinusRel.java  |  56 +++++++++
 .../dsls/sql/rel/BeamSetOperatorRelBase.java    |  99 +++++++++++++++
 .../apache/beam/dsls/sql/rel/BeamUnionRel.java  |  88 +++++++++++++
 .../beam/dsls/sql/rule/BeamIntersectRule.java   |  51 ++++++++
 .../beam/dsls/sql/rule/BeamMinusRule.java       |  51 ++++++++
 .../beam/dsls/sql/rule/BeamUnionRule.java       |  50 ++++++++
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   3 +-
 .../transform/BeamSetOperatorsTransforms.java   | 113 +++++++++++++++++
 .../dsls/sql/planner/MockedBeamSqlTable.java    |   6 +-
 .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 111 +++++++++++++++++
 .../beam/dsls/sql/rel/BeamMinusRelTest.java     | 110 +++++++++++++++++
 .../sql/rel/BeamSetOperatorRelBaseTest.java     | 122 +++++++++++++++++++
 .../beam/dsls/sql/rel/BeamUnionRelTest.java     |  99 +++++++++++++++
 .../org/apache/beam/dsls/sql/rel/CheckSize.java |  41 +++++++
 17 files changed, 1069 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 1ad62bc..6c73558 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -19,6 +19,7 @@ package org.apache.beam.dsls.sql.planner;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+
 import java.util.Iterator;
 
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
@@ -26,8 +27,11 @@ import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
 import org.apache.beam.dsls.sql.rule.BeamFilterRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
+import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
+import org.apache.beam.dsls.sql.rule.BeamMinusRule;
 import org.apache.beam.dsls.sql.rule.BeamProjectRule;
 import org.apache.beam.dsls.sql.rule.BeamSortRule;
+import org.apache.beam.dsls.sql.rule.BeamUnionRule;
 import org.apache.beam.dsls.sql.rule.BeamValuesRule;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.rel.RelNode;
@@ -42,7 +46,8 @@ public class BeamRuleSets {
   private static final ImmutableSet<RelOptRule> calciteToBeamConversionRules = ImmutableSet
       .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
           BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
-          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE)
+          BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
+          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE)
       .build();
 
   public static RuleSet[] getRuleSets() {

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index c0d2783..9951536 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -18,6 +18,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
+
 import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRow;
@@ -79,37 +80,39 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
     PCollection<BeamSqlRow> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
     if (windowFieldIdx != -1) {
-      upstream = upstream.apply("assignEventTimestamp", WithTimestamps
+      upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
           .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
           .setCoder(upstream.getCoder());
     }
 
-    PCollection<BeamSqlRow> windowStream = upstream.apply("window",
+    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window",
         Window.<BeamSqlRow>into(windowFn)
         .triggering(trigger)
         .withAllowedLateness(allowedLatence)
         .accumulatingFiredPanes());
 
     BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply("exGroupBy",
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply(
+        stageName + "_exGroupBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
                 windowFieldIdx, groupSet)))
         .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder()));
 
     PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
+        .apply(stageName + "_groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
         .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
             IterableCoder.<BeamSqlRow>of(upstream.getCoder())));
 
     BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation",
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply(
+        stageName + "_aggregation",
         Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
             new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
                 BeamSqlRecordType.from(input.getRowType()))))
         .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
 
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord",
+    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
             BeamSqlRecordType.from(getRowType()), getAggCallList())));
     mergedStream.setCoder(new BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
new file mode 100644
index 0000000..01e1c33
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIntersectRel.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Intersect} node.
+ *
+ * <p>This is used to combine two SELECT statements, but returns rows only from the
+ * first SELECT statement that are identical to a row in the second SELECT statement.
+ */
+public class BeamIntersectRel extends Intersect implements BeamRelNode {
+  private BeamSetOperatorRelBase delegate;
+  public BeamIntersectRel(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+      throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
new file mode 100644
index 0000000..bee6c11
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamMinusRel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.core.SetOp;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Minus} node.
+ *
+ * <p>Corresponds to the SQL {@code EXCEPT} operator.
+ */
+public class BeamMinusRel extends Minus implements BeamRelNode {
+
+  private BeamSetOperatorRelBase delegate;
+
+  public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamMinusRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+      throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
new file mode 100644
index 0000000..271e98f
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSetOperatorsTransforms;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.transforms.join.CoGroupByKey;
+import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
+ * and {@code BeamMinusRel}.
+ */
+public class BeamSetOperatorRelBase {
+  /**
+   * Set operator type.
+   */
+  public enum OpType implements Serializable {
+    UNION,
+    INTERSECT,
+    MINUS
+  }
+
+  private BeamRelNode beamRelNode;
+  private List<RelNode> inputs;
+  private boolean all;
+  private OpType opType;
+
+  public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
+      List<RelNode> inputs, boolean all) {
+    this.beamRelNode = beamRelNode;
+    this.opType = opType;
+    this.inputs = inputs;
+    this.all = all;
+  }
+
+  public PCollection<BeamSqlRow> buildBeamPipeline(
+      PCollectionTuple inputPCollections) throws Exception {
+    PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
+        .buildBeamPipeline(inputPCollections);
+    PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
+        .buildBeamPipeline(inputPCollections);
+
+    WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
+    WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
+    if (!leftWindow.isCompatible(rightWindow)) {
+      throw new IllegalArgumentException(
+          "inputs of " + opType + " have different window strategy: "
+          + leftWindow + " VS " + rightWindow);
+    }
+
+    final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
+    final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
+
+    // co-group
+    String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
+    PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
+        .of(leftTag, leftRows.apply(
+            stageName + "_CreateLeftIndex", MapElements.via(
+                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+        .and(rightTag, rightRows.apply(
+            stageName + "_CreateRightIndex", MapElements.via(
+                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
+        .apply(CoGroupByKey.<BeamSqlRow>create());
+    PCollection<BeamSqlRow> ret = coGbkResultCollection
+        .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
+            opType, all)));
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
new file mode 100644
index 0000000..63cf11a
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.core.Union;
+
+/**
+ * {@link BeamRelNode} to replace a {@link Union}.
+ *
+ * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
+ * perspective, two cases are supported:
+ *
+ * <p>1) Do not use {@code grouped window function}:
+ *
+ * <pre>{@code
+ *   select * from person UNION select * from person
+ * }</pre>
+ *
+ * <p>2) Use the same {@code grouped window function}, with the same param:
+ * <pre>{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ * }</pre>
+ *
+ * <p>Inputs with different group functions are NOT supported:
+ * <pre>{@code
+ *   select id, count(*) from person
+ *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
+ *   UNION
+ *   select * from person
+ *   group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
+ * }</pre>
+ */
+public class BeamUnionRel extends Union implements BeamRelNode {
+  private BeamSetOperatorRelBase delegate;
+  public BeamUnionRel(RelOptCluster cluster,
+      RelTraitSet traits,
+      List<RelNode> inputs,
+      boolean all) {
+    super(cluster, traits, inputs, all);
+    this.delegate = new BeamSetOperatorRelBase(this,
+        BeamSetOperatorRelBase.OpType.UNION,
+        inputs, all);
+  }
+
+  public BeamUnionRel(RelInput input) {
+    super(input);
+  }
+
+  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+    return new BeamUnionRel(getCluster(), traitSet, inputs, all);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+      throws Exception {
+    return delegate.buildBeamPipeline(inputPCollections);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
new file mode 100644
index 0000000..70716c5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamIntersectRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Intersect;
+import org.apache.calcite.rel.logical.LogicalIntersect;
+
+/**
+ * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRule extends ConverterRule {
+  public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
+  private BeamIntersectRule() {
+    super(LogicalIntersect.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Intersect intersect = (Intersect) rel;
+    final List<RelNode> inputs = intersect.getInputs();
+    return new BeamIntersectRel(
+        intersect.getCluster(),
+        intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(inputs, BeamLogicalConvention.INSTANCE),
+        intersect.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
new file mode 100644
index 0000000..ca93c71
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import java.util.List;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamMinusRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Minus;
+import org.apache.calcite.rel.logical.LogicalMinus;
+
+/**
+ * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
+ */
+public class BeamMinusRule extends ConverterRule {
+  public static final BeamMinusRule INSTANCE = new BeamMinusRule();
+  private BeamMinusRule() {
+    super(LogicalMinus.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamMinusRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Minus minus = (Minus) rel;
+    final List<RelNode> inputs = minus.getInputs();
+    return new BeamMinusRel(
+        minus.getCluster(),
+        minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(inputs, BeamLogicalConvention.INSTANCE),
+        minus.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
new file mode 100644
index 0000000..b8430b9
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.beam.dsls.sql.rel.BeamUnionRel;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.logical.LogicalUnion;
+
+/**
+ * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
+ * {@link BeamUnionRule}.
+ */
+public class BeamUnionRule extends ConverterRule {
+  public static final BeamUnionRule INSTANCE = new BeamUnionRule();
+  private BeamUnionRule() {
+    super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
+        "BeamUnionRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Union union = (Union) rel;
+
+    return new BeamUnionRel(
+        union.getCluster(),
+        union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
+        union.all
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index f885aaf..a7e9f4b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -32,11 +32,10 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.joda.time.Instant;
 
 /**
- * Repersent a generic ROW record in Beam SQL.
+ * Represent a generic ROW record in Beam SQL.
  *
  */
 public class BeamSqlRow implements Serializable {
-
   private List<Integer> nullFields = new ArrayList<>();
   private List<Object> dataValues;
   private BeamSqlRecordType dataType;

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
new file mode 100644
index 0000000..56b3e14
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSetOperatorsTransforms.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.Iterator;
+
+import org.apache.beam.dsls.sql.rel.BeamSetOperatorRelBase;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.join.CoGbkResult;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform Set operations.
+ */
+public abstract class BeamSetOperatorsTransforms {
+  /**
+   * Transform a {@code BeamSqlRow} to a {@code KV<BeamSqlRow, BeamSqlRow>}.
+   */
+  public static class BeamSqlRow2KvFn extends
+      SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+      return KV.of(input, input);
+    }
+  }
+
+  /**
+   * Filter function used for Set operators.
+   */
+  public static class SetOperatorFilteringDoFn extends
+      DoFn<KV<BeamSqlRow, CoGbkResult>, BeamSqlRow> {
+    private TupleTag<BeamSqlRow> leftTag;
+    private TupleTag<BeamSqlRow> rightTag;
+    private BeamSetOperatorRelBase.OpType opType;
+    // ALL?
+    private boolean all;
+
+    public SetOperatorFilteringDoFn(TupleTag<BeamSqlRow> leftTag, TupleTag<BeamSqlRow> rightTag,
+        BeamSetOperatorRelBase.OpType opType, boolean all) {
+      this.leftTag = leftTag;
+      this.rightTag = rightTag;
+      this.opType = opType;
+      this.all = all;
+    }
+
+    @ProcessElement public void processElement(ProcessContext ctx) {
+      CoGbkResult coGbkResult = ctx.element().getValue();
+      Iterable<BeamSqlRow> leftRows = coGbkResult.getAll(leftTag);
+      Iterable<BeamSqlRow> rightRows = coGbkResult.getAll(rightTag);
+      switch (opType) {
+        case UNION:
+          if (all) {
+            // output both left & right
+            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            while (iter.hasNext()) {
+              ctx.output(iter.next());
+            }
+            iter = rightRows.iterator();
+            while (iter.hasNext()) {
+              ctx.output(iter.next());
+            }
+          } else {
+            // only output the key
+            ctx.output(ctx.element().getKey());
+          }
+          break;
+        case INTERSECT:
+          if (leftRows.iterator().hasNext() && rightRows.iterator().hasNext()) {
+            if (all) {
+              Iterator<BeamSqlRow> iter = leftRows.iterator();
+              while (iter.hasNext()) {
+                ctx.output(iter.next());
+              }
+            } else {
+              ctx.output(ctx.element().getKey());
+            }
+          }
+          break;
+        case MINUS:
+          if (leftRows.iterator().hasNext() && !rightRows.iterator().hasNext()) {
+            Iterator<BeamSqlRow> iter = leftRows.iterator();
+            if (all) {
+              // output all
+              while (iter.hasNext()) {
+                ctx.output(iter.next());
+              }
+            } else {
+              // only output one
+              ctx.output(iter.next());
+            }
+          }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index 2ff042d..185e95a 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -20,6 +20,7 @@ package org.apache.beam.dsls.sql.planner;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
@@ -43,7 +44,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
  *
  */
 public class MockedBeamSqlTable extends BaseBeamTable {
-
+  public static final AtomicInteger COUNTER = new AtomicInteger();
   public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
 
   private List<BeamSqlRow> inputRecords;
@@ -122,7 +123,8 @@ public class MockedBeamSqlTable extends BaseBeamTable {
 
   @Override
   public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply(Create.of(inputRecords));
+    return PBegin.in(pipeline).apply(
+        "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
new file mode 100644
index 0000000..02223c2
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamIntersectRelTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamIntersectRel}.
+ */
+public class BeamIntersectRelTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          1L, 1, 1.0,
+          1L, 1, 1.0,
+          2L, 2, 2.0,
+          4L, 4, 4.0
+      );
+
+  private static MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          1L, 1, 1.0,
+          2L, 2, 2.0,
+          3L, 3, 3.0
+      );
+
+  @BeforeClass
+  public static void setUp() {
+    BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+    BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+  }
+
+  @Test
+  public void testIntersect() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " INTERSECT "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+        SqlTypeName.BIGINT, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.DOUBLE, "price",
+
+        1L, 1, 1.0,
+        2L, 2, 2.0
+    ).getInputRecords());
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testIntersectAll() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " INTERSECT ALL "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).satisfies(new CheckSize(3));
+
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getInputRecords());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
new file mode 100644
index 0000000..cd6ba16
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamMinusRelTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamMinusRel}.
+ */
+public class BeamMinusRelTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private MockedBeamSqlTable orderDetailsTable1 = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          1L, 1, 1.0,
+          1L, 1, 1.0,
+          2L, 2, 2.0,
+          4L, 4, 4.0,
+          4L, 4, 4.0
+      );
+
+  private MockedBeamSqlTable orderDetailsTable2 = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          1L, 1, 1.0,
+          2L, 2, 2.0,
+          3L, 3, 3.0
+      );
+
+  @Before
+  public void setUp() {
+    BeamSqlEnv.registerTable("ORDER_DETAILS1", orderDetailsTable1);
+    BeamSqlEnv.registerTable("ORDER_DETAILS2", orderDetailsTable2);
+    MockedBeamSqlTable.CONTENT.clear();
+  }
+
+  @Test
+  public void testExcept() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+        SqlTypeName.BIGINT, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.DOUBLE, "price",
+            4L, 4, 4.0
+    ).getInputRecords());
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testExceptAll() throws Exception {
+    String sql = "";
+    sql += "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS1 "
+        + " EXCEPT ALL "
+        + "SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS2 ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).satisfies(new CheckSize(2));
+
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+            4L, 4, 4.0,
+            4L, 4, 4.0
+        ).getInputRecords());
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
new file mode 100644
index 0000000..4936062
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSetOperatorRelBaseTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSetOperatorRelBase}.
+ */
+public class BeamSetOperatorRelBaseTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  public static final Date THE_DATE = new Date();
+  private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+          SqlTypeName.TIMESTAMP, "order_time",
+
+          1L, 1, 1.0, THE_DATE,
+          2L, 2, 2.0, THE_DATE);
+
+  @BeforeClass
+  public static void prepare() {
+    THE_DATE.setTime(100000);
+    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+  }
+
+  @Test
+  public void testSameWindow() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    List<BeamSqlRow> expRows =
+        MockedBeamSqlTable.of(
+        SqlTypeName.BIGINT, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.BIGINT, "cnt",
+
+        1L, 1, 1L,
+        2L, 2, 1L
+    ).getInputRecords();
+    // compare valueInString to ignore the windowStart & windowEnd
+    PAssert.that(rows.apply(ParDo.of(new ToString()))).containsInAnyOrder(toString(expRows));
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDifferentWindows() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '1' HOUR) "
+        + " UNION SELECT "
+        + " order_id, site_id, count(*) as cnt "
+        + "FROM ORDER_DETAILS GROUP BY order_id, site_id"
+        + ", TUMBLE(order_time, INTERVAL '2' HOUR) ";
+
+    // use a real pipeline rather than the TestPipeline because we are
+    // testing exceptions, the pipeline will not actually run.
+    Pipeline pipeline1 = Pipeline.create(PipelineOptionsFactory.create());
+    BeamSqlCli.compilePipeline(sql, pipeline1);
+    pipeline.run();
+  }
+
+  static class ToString extends DoFn<BeamSqlRow, String> {
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().valueInString());
+    }
+  }
+
+  static List<String> toString (List<BeamSqlRow> rows) {
+    List<String> strs = new ArrayList<>();
+    for (BeamSqlRow row : rows) {
+      strs.add(row.valueInString());
+    }
+
+    return strs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
new file mode 100644
index 0000000..c2a0597
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamUnionRelTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamUnionRel}.
+ */
+public class BeamUnionRelTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static MockedBeamSqlTable orderDetailsTable = MockedBeamSqlTable
+      .of(SqlTypeName.BIGINT, "order_id",
+          SqlTypeName.INTEGER, "site_id",
+          SqlTypeName.DOUBLE, "price",
+
+          1L, 1, 1.0,
+          2L, 2, 2.0);
+
+  @BeforeClass
+  public static void prepare() {
+    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailsTable);
+  }
+
+  @Test
+  public void testUnion() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS "
+        + " UNION SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS ";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 1, 1.0,
+            2L, 2, 2.0
+        ).getInputRecords()
+    );
+    pipeline.run();
+  }
+
+  @Test
+  public void testUnionAll() throws Exception {
+    String sql = "SELECT "
+        + " order_id, site_id, price "
+        + "FROM ORDER_DETAILS"
+        + " UNION ALL "
+        + " SELECT order_id, site_id, price "
+        + "FROM ORDER_DETAILS";
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
+    PAssert.that(rows).containsInAnyOrder(
+        MockedBeamSqlTable.of(
+            SqlTypeName.BIGINT, "order_id",
+            SqlTypeName.INTEGER, "site_id",
+            SqlTypeName.DOUBLE, "price",
+
+            1L, 1, 1.0,
+            1L, 1, 1.0,
+            2L, 2, 2.0,
+            2L, 2, 2.0
+        ).getInputRecords()
+    );
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/81d699e4/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
new file mode 100644
index 0000000..ce532df
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/CheckSize.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.junit.Assert;
+
+/**
+ * Utility class to check size of BeamSQLRow iterable.
+ */
+public class CheckSize implements SerializableFunction<Iterable<BeamSqlRow>, Void> {
+  private int size;
+  public CheckSize(int size) {
+    this.size = size;
+  }
+  @Override public Void apply(Iterable<BeamSqlRow> input) {
+    int count = 0;
+    for (BeamSqlRow row : input) {
+      count++;
+    }
+    Assert.assertEquals(size, count);
+    return null;
+  }
+}


[2/2] beam git commit: [BEAM-2325] This closes #3183

Posted by jb...@apache.org.
[BEAM-2325] This closes #3183


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/315f266a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/315f266a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/315f266a

Branch: refs/heads/DSL_SQL
Commit: 315f266a6e24cd851a98cba320635c9673ba3dac
Parents: c017159 81d699e
Author: Jean-Baptiste Onofré <jb...@apache.org>
Authored: Tue Jun 13 14:40:56 2017 +0200
Committer: Jean-Baptiste Onofré <jb...@apache.org>
Committed: Tue Jun 13 14:40:56 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   7 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  15 ++-
 .../beam/dsls/sql/rel/BeamIntersectRel.java     |  58 +++++++++
 .../apache/beam/dsls/sql/rel/BeamMinusRel.java  |  56 +++++++++
 .../dsls/sql/rel/BeamSetOperatorRelBase.java    |  99 +++++++++++++++
 .../apache/beam/dsls/sql/rel/BeamUnionRel.java  |  88 +++++++++++++
 .../beam/dsls/sql/rule/BeamIntersectRule.java   |  51 ++++++++
 .../beam/dsls/sql/rule/BeamMinusRule.java       |  51 ++++++++
 .../beam/dsls/sql/rule/BeamUnionRule.java       |  50 ++++++++
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   3 +-
 .../transform/BeamSetOperatorsTransforms.java   | 113 +++++++++++++++++
 .../dsls/sql/planner/MockedBeamSqlTable.java    |   6 +-
 .../beam/dsls/sql/rel/BeamIntersectRelTest.java | 111 +++++++++++++++++
 .../beam/dsls/sql/rel/BeamMinusRelTest.java     | 110 +++++++++++++++++
 .../sql/rel/BeamSetOperatorRelBaseTest.java     | 122 +++++++++++++++++++
 .../beam/dsls/sql/rel/BeamUnionRelTest.java     |  99 +++++++++++++++
 .../org/apache/beam/dsls/sql/rel/CheckSize.java |  41 +++++++
 17 files changed, 1069 insertions(+), 11 deletions(-)
----------------------------------------------------------------------