You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/29 23:35:40 UTC

[1/2] beam git commit: [BEAM-2193] Implement FULL, INNER, and OUTER JOIN: - FULL and INNER supported on all variations of unbounded/bounded joins. - OUTER JOIN supported when outer side is unbounded. - Unbounded/bounded joins implemented via side inputs.

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL ab4b11886 -> 2096da25e


[BEAM-2193] Implement FULL, INNER, and OUTER JOIN:
- FULL and INNER supported on all variations of unbounded/bounded joins.
- OUTER JOIN supported when outer side is unbounded.
- Unbounded/bounded joins implemented via side inputs.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/928cec59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/928cec59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/928cec59

Branch: refs/heads/DSL_SQL
Commit: 928cec597175c363d444331b35ac8793297a242b
Parents: ab4b118
Author: James Xu <xu...@gmail.com>
Authored: Mon May 29 11:11:34 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 29 16:32:23 2017 -0700

----------------------------------------------------------------------
 dsls/pom.xml                                    |   2 +-
 dsls/sql/pom.xml                                |  16 +-
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   6 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  19 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   | 305 +++++++++++++++++++
 .../apache/beam/dsls/sql/rule/BeamJoinRule.java |  53 ++++
 .../beam/dsls/sql/schema/BeamSqlRecordType.java |   2 +-
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   2 +-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |   3 -
 .../dsls/sql/transform/BeamJoinTransforms.java  | 166 ++++++++++
 .../org/apache/beam/dsls/sql/TestUtils.java     | 125 ++++++++
 .../dsls/sql/planner/MockedBeamSqlTable.java    |   5 +-
 .../beam/dsls/sql/planner/MockedTable.java      |  33 ++
 .../dsls/sql/planner/MockedUnboundedTable.java  | 120 ++++++++
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    | 195 ++++++++++++
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  | 242 +++++++++++++++
 .../BeamJoinRelUnboundedVsUnboundedTest.java    | 219 +++++++++++++
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   2 +-
 18 files changed, 1486 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index d932698..a518d03 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -66,7 +66,7 @@
         </plugin>
       </plugins>
     </pluginManagement>
-    
+
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index a2279d5..54f590e 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -157,6 +157,11 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.9.0.1</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -193,21 +198,18 @@
       <artifactId>joda-time</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>0.10.1.0</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>com.google.auto.value</groupId>
       <artifactId>auto-value</artifactId>
       <scope>provided</scope>
     </dependency>
-
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-direct-java</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 6c73558..552ff8f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -19,15 +19,14 @@ package org.apache.beam.dsls.sql.planner;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-
 import java.util.Iterator;
-
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
 import org.apache.beam.dsls.sql.rule.BeamFilterRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
 import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
 import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
+import org.apache.beam.dsls.sql.rule.BeamJoinRule;
 import org.apache.beam.dsls.sql.rule.BeamMinusRule;
 import org.apache.beam.dsls.sql.rule.BeamProjectRule;
 import org.apache.beam.dsls.sql.rule.BeamSortRule;
@@ -47,7 +46,8 @@ public class BeamRuleSets {
       .<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
           BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
           BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
-          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE)
+          BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
+          BeamJoinRule.INSTANCE)
       .build();
 
   public static RuleSet[] getRuleSets() {

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 701f620..9ec9e9f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -74,40 +74,41 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
       , BeamSqlEnv sqlEnv) throws Exception {
     RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this) + "_";
 
     PCollection<BeamSqlRow> upstream =
         BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
     if (windowFieldIdx != -1) {
-      upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
-          .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+      upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
+          .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
           .setCoder(upstream.getCoder());
     }
 
-    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window",
-        Window.<BeamSqlRow>into(windowFn)
+    PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+        Window.into(windowFn)
         .triggering(trigger)
         .withAllowedLateness(allowedLatence)
         .accumulatingFiredPanes());
 
     BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
     PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
-        stageName + "_exCombineBy",
+        stageName + "exCombineBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
                 windowFieldIdx, groupSet)))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder()));
+        .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
+
 
     BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
 
     PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
-        stageName + "_combineBy",
+        stageName + "combineBy",
         Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
             new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
                 CalciteUtils.toBeamRecordType(input.getRowType()))))
         .setCoder(KvCoder.of(keyCoder, aggCoder));
 
-    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord",
+    PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
             CalciteUtils.toBeamRecordType(getRowType()), getAggCallList())));
     mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
new file mode 100644
index 0000000..e85368e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Join} node.
+ *
+ * <p>Support for join can be categorized into 3 cases:
+ * <ul>
+ *   <li>BoundedTable JOIN BoundedTable</li>
+ *   <li>UnboundedTable JOIN UnboundedTable</li>
+ *   <li>BoundedTable JOIN UnboundedTable</li>
+ * </ul>
+ *
+ * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
+ * sides match.
+ *
+ * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
+ * constraints:
+ *
+ * <ul>
+ *   <li>{@code FULL OUTER JOIN} is not supported.</li>
+ *   <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
+ *   <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
+ * </ul>
+ *
+ *
+ * <p>There are also some general constraints:
+ *
+ * <ul>
+ *  <li>Only equi-join is supported.</li>
+ *  <li>CROSS JOIN is not supported.</li>
+ * </ul>
+ */
+public class BeamJoinRel extends Join implements BeamRelNode {
+  public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+    super(cluster, traits, left, right, condition, variablesSet, joinType);
+  }
+
+  @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+      RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+    return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+        joinType);
+  }
+
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+      BeamSqlEnv sqlEnv)
+      throws Exception {
+    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+    BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType());
+    PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+    leftRows.setCoder(new BeamSqlRowCoder(leftRowType));
+
+    final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+    BeamSqlRecordType rightRowType = CalciteUtils.toBeamRecordType(right.getRowType());
+    PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+    rightRows.setCoder(new BeamSqlRowCoder(rightRowType));
+
+    String stageName = BeamSqlRelUtils.getStageName(this);
+    WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+    WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+    // extract the join fields
+    List<Pair<Integer, Integer>> pairs = extractJoinColumns(
+        leftRelNode.getRowType().getFieldCount());
+
+    // build the extract key type
+    // the name of the join field is not important
+    List<String> names = new ArrayList<>(pairs.size());
+    List<Integer> types = new ArrayList<>(pairs.size());
+    for (int i = 0; i < pairs.size(); i++) {
+      names.add("c" + i);
+      types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+    }
+    BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+
+    Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+
+    // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+        .apply(stageName + "_left_ExtractJoinFields",
+            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+        .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+        .apply(stageName + "_right_ExtractJoinFields",
+            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+        .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+    // prepare the NullRows
+    BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
+    BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+
+    // a regular join
+    if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+           || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+                && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
+      try {
+        leftWinFn.verifyCompatibility(rightWinFn);
+      } catch (IncompatibleWindowException e) {
+        throw new IllegalArgumentException(
+            "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+      }
+
+      return standardJoin(extractedLeftRows, extractedRightRows,
+          leftNullRow, rightNullRow, stageName);
+    } else if (
+        (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+        && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
+        || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+        ) {
+      // if one of the sides is Bounded & the other is Unbounded
+      // then do a sideInput join
+      // when doing a sideInput join, the windowFn does not need to match
+      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
+      // the unbounded
+      if (joinType == JoinRelType.FULL) {
+        throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
+            + "a bounded table with an unbounded table.");
+      }
+
+      if ((joinType == JoinRelType.LEFT
+          && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
+          || (joinType == JoinRelType.RIGHT
+          && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+        throw new UnsupportedOperationException(
+            "LEFT side of an OUTER JOIN must be Unbounded table.");
+      }
+
+      return sideInputJoin(extractedLeftRows, extractedRightRows,
+          leftNullRow, rightNullRow);
+    } else {
+      throw new UnsupportedOperationException(
+          "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
+    }
+  }
+
+  private PCollection<BeamSqlRow> standardJoin(
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
+    PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+    switch (joinType) {
+      case LEFT:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
+        break;
+      case RIGHT:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
+        break;
+      case FULL:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
+            rightNullRow);
+        break;
+      case INNER:
+      default:
+        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+            .innerJoin(extractedLeftRows, extractedRightRows);
+        break;
+    }
+
+    PCollection<BeamSqlRow> ret = joinedRows
+        .apply(stageName + "_JoinParts2WholeRow",
+            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+    return ret;
+  }
+
+  public PCollection<BeamSqlRow> sideInputJoin(
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+    // we always make the Unbounded table on the left to do the sideInput join
+    // (will convert the result accordingly before return)
+    boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
+    JoinRelType realJoinType =
+        (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+        swapped ? extractedRightRows : extractedLeftRows;
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+        swapped ? extractedLeftRows : extractedRightRows;
+    BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+
+    // swapped still need to pass down because, we need to swap the result back.
+    return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
+        realRightNullRow, swapped);
+  }
+
+  private PCollection<BeamSqlRow> sideInputJoinHelper(
+      JoinRelType joinType,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
+      PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
+      BeamSqlRow rightNullRow, boolean swapped) {
+    final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
+        .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+
+    PCollection<BeamSqlRow> ret = leftRows
+        .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
+            joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
+        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+
+    return ret;
+  }
+
+  private BeamSqlRow buildNullRow(BeamRelNode relNode) {
+    BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType());
+    BeamSqlRow nullRow = new BeamSqlRow(leftType);
+    for (int i = 0; i < leftType.size(); i++) {
+      nullRow.addField(i, null);
+    }
+    return nullRow;
+  }
+
+  private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+    // it's a CROSS JOIN because: condition == true
+    if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+      throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+    }
+
+    RexCall call = (RexCall) condition;
+    List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+    if ("AND".equals(call.getOperator().getName())) {
+      List<RexNode> operands = call.getOperands();
+      for (RexNode rexNode : operands) {
+        Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+        pairs.add(pair);
+      }
+    } else if ("=".equals(call.getOperator().getName())) {
+      pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+    } else {
+      throw new UnsupportedOperationException(
+          "Operator " + call.getOperator().getName() + " is not supported in join condition");
+    }
+
+    return pairs;
+  }
+
+  private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+      int leftRowColumnCount) {
+    List<RexNode> operands = oneCondition.getOperands();
+    final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+        ((RexInputRef) operands.get(1)).getIndex());
+
+    final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+        ((RexInputRef) operands.get(1)).getIndex());
+    final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+    return new Pair<>(leftIndex, rightIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
new file mode 100644
index 0000000..78253fe
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamJoinRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
+ */
+public class BeamJoinRule extends ConverterRule {
+  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+  private BeamJoinRule() {
+    super(LogicalJoin.class, Convention.NONE,
+        BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+  }
+
+  @Override public RelNode convert(RelNode rel) {
+    Join join = (Join) rel;
+    return new BeamJoinRel(
+        join.getCluster(),
+        join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+        convert(join.getLeft(),
+            join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        convert(join.getRight(),
+            join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        join.getCondition(),
+        join.getVariablesSet(),
+        join.getJoinType()
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
index 9fc3945..52bd652 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -31,7 +31,7 @@ public abstract class BeamSqlRecordType implements Serializable {
   public abstract List<Integer> getFieldsType();
 
   public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
-    return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
+    return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
   }
 
   public int size() {

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 213dcd5..2d7e350 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -365,6 +365,6 @@ public class BeamSqlRow implements Serializable {
   }
 
   @Override public int hashCode() {
-    return toString().hashCode();
+    return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index e86fb3f..d53ba8d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
-
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
 import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -58,7 +57,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
   @Override
   public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
     listCoder.encode(value.getNullFields(), outStream);
-
     for (int idx = 0; idx < value.size(); ++idx) {
       if (value.getNullFields().contains(idx)) {
         continue;
@@ -113,7 +111,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
 
     BeamSqlRow record = new BeamSqlRow(tableSchema);
     record.setNullFields(nullFields);
-
     for (int idx = 0; idx < tableSchema.size(); ++idx) {
       if (nullFields.contains(idx)) {
         continue;

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
new file mode 100644
index 0000000..8169b83
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.Pair;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
+ */
+public class BeamJoinTransforms {
+
+  /**
+   * A {@code SimpleFunction} to extract join fields from the specified row.
+   */
+  public static class ExtractJoinFields
+      extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+    private final boolean isLeft;
+    private final List<Pair<Integer, Integer>> joinColumns;
+
+    public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
+      this.isLeft = isLeft;
+      this.joinColumns = joinColumns;
+    }
+
+    @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+      // build the type
+      // the name of the join field is not important
+      List<String> names = new ArrayList<>(joinColumns.size());
+      List<Integer> types = new ArrayList<>(joinColumns.size());
+      for (int i = 0; i < joinColumns.size(); i++) {
+        names.add("c" + i);
+        types.add(isLeft
+            ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
+            input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+      }
+      BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+
+      // build the row
+      BeamSqlRow row = new BeamSqlRow(type);
+      for (int i = 0; i < joinColumns.size(); i++) {
+        row.addField(i, input
+            .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
+      }
+      return KV.of(row, input);
+    }
+  }
+
+
+  /**
+   * A {@code DoFn} which implement the sideInput-JOIN.
+   */
+  public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+    private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+    private final JoinRelType joinType;
+    private final BeamSqlRow rightNullRow;
+    private final boolean swap;
+
+    public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
+        PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+        boolean swap) {
+      this.joinType = joinType;
+      this.rightNullRow = rightNullRow;
+      this.sideInputView = sideInputView;
+      this.swap = swap;
+    }
+
+    @ProcessElement public void processElement(ProcessContext context) {
+      BeamSqlRow key = context.element().getKey();
+      BeamSqlRow leftRow = context.element().getValue();
+      Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
+      Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+
+      if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
+        Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+        while (it.hasNext()) {
+          context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
+        }
+      } else {
+        if (joinType == JoinRelType.LEFT) {
+          context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
+        }
+      }
+    }
+  }
+
+
+  /**
+   * A {@code SimpleFunction} to combine two rows into one.
+   */
+  public static class JoinParts2WholeRow
+      extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
+    @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
+      KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
+      BeamSqlRow leftRow = parts.getKey();
+      BeamSqlRow rightRow = parts.getValue();
+      return combineTwoRowsIntoOne(leftRow, rightRow, false);
+    }
+  }
+
+  /**
+   * As the method name suggests: combine two rows into one wide row.
+   */
+  private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
+      BeamSqlRow rightRow, boolean swap) {
+    if (swap) {
+      return combineTwoRowsIntoOneHelper(rightRow, leftRow);
+    } else {
+      return combineTwoRowsIntoOneHelper(leftRow, rightRow);
+    }
+  }
+
+  /**
+   * As the method name suggests: combine two rows into one wide row.
+   */
+  private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
+      BeamSqlRow rightRow) {
+    // build the type
+    List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+    names.addAll(leftRow.getDataType().getFieldsName());
+    names.addAll(rightRow.getDataType().getFieldsName());
+
+    List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+    types.addAll(leftRow.getDataType().getFieldsType());
+    types.addAll(rightRow.getDataType().getFieldsType());
+    BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+
+    BeamSqlRow row = new BeamSqlRow(type);
+    // build the row
+    for (int i = 0; i < leftRow.size(); i++) {
+      row.addField(i, leftRow.getFieldValue(i));
+    }
+
+    for (int i = 0; i < rightRow.size(); i++) {
+      row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
+    }
+
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
new file mode 100644
index 0000000..375027a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Test utilities.
+ */
+public class TestUtils {
+
+  /**
+   * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
+   */
+  public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+    @ProcessElement
+    public void processElement(ProcessContext ctx) {
+      ctx.output(ctx.element().valueInString());
+    }
+  }
+
+  /**
+   * Convert list of {@code BeamSqlRow} to list of {@code String}.
+   */
+  public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+    List<String> strs = new ArrayList<>();
+    for (BeamSqlRow row : rows) {
+      strs.add(row.valueInString());
+    }
+
+    return strs;
+  }
+
+  /**
+   * Convenient way to build a list of {@code BeamSqlRow}s.
+   *
+   * <p>You can use it like this:
+   *
+   * <pre>{@code
+   * TestUtils.RowsBuilder.of(
+   *   Types.INTEGER, "order_id",
+   *   Types.INTEGER, "sum_site_id",
+   *   Types.VARCHAR, "buyer"
+   * ).values(
+   *   1, 3, "james",
+   *   2, 5, "bond"
+   *   ).getStringRows()
+   * }</pre>
+   * {@code}
+   */
+  public static class RowsBuilder {
+    private BeamSqlRecordType type;
+    private List<BeamSqlRow> rows = new ArrayList<>();
+
+    /**
+     * Create a RowsBuilder with the specified row type info.
+     *
+     * <p>Note: check the class javadoc for for detailed example.
+     *
+     * @args pairs of column type and column names.
+     */
+    public static RowsBuilder of(final Object... args) {
+      List<Integer> types = new ArrayList<>();
+      List<String> names = new ArrayList<>();
+      int lastTypeIndex = 0;
+      for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
+        types.add((int) args[lastTypeIndex]);
+        names.add((String) args[lastTypeIndex + 1]);
+      }
+
+      BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types);
+      RowsBuilder builder = new RowsBuilder();
+      builder.type = beamSQLRecordType;
+
+      return builder;
+    }
+
+    /**
+     * Add values to the builder.
+     *
+     * <p>Note: check the class javadoc for for detailed example.
+     */
+    public RowsBuilder values(final Object... args) {
+      int fieldCount = type.size();
+      for (int i = 0; i < args.length; i += fieldCount) {
+        BeamSqlRow row = new BeamSqlRow(type);
+        for (int j = 0; j < fieldCount; j++) {
+          row.addField(j, args[i + j]);
+        }
+        this.rows.add(row);
+      }
+
+      return this;
+    }
+
+    public List<BeamSqlRow> getRows() {
+      return rows;
+    }
+
+    public List<String> getStringRows() {
+      return beamSqlRows2Strings(rows);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index f651f6a..fa80cc1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
 import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
@@ -49,7 +48,6 @@ public class MockedBeamSqlTable extends BaseBeamTable {
   public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
 
   private List<BeamSqlRow> inputRecords;
-
   public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
     super(beamSqlRecordType);
   }
@@ -119,10 +117,11 @@ public class MockedBeamSqlTable extends BaseBeamTable {
 
   @Override
   public BeamIOType getSourceType() {
-    return BeamIOType.UNBOUNDED;
+    return BeamIOType.BOUNDED;
   }
 
   @Override
+
   public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply(
         "MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords));

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
new file mode 100644
index 0000000..d096a61
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+
+/**
+ * Base class for mocked table.
+ */
+public abstract class MockedTable extends BaseBeamTable {
+  public static final AtomicInteger COUNTER = new AtomicInteger();
+  public MockedTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
new file mode 100644
index 0000000..3f22df3
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.calcite.util.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A mocked unbounded table.
+ */
+public class MockedUnboundedTable extends MockedTable {
+  private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+  private int timestampField;
+  private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
+    super(beamSqlRecordType);
+  }
+
+  /**
+   * Convenient way to build a mocked table.
+   *
+   * <p>e.g.
+   *
+   * <pre>{@code
+   * MockedUnboundedTable
+   *   .of(Types.BIGINT, "order_id",
+   *       Types.INTEGER, "site_id",
+   *       Types.DOUBLE, "price",
+   *       Types.TIMESTAMP, "order_time")
+   * }</pre>
+   */
+  public static MockedUnboundedTable of(final Object... args){
+    List<Integer> types = new ArrayList<>();
+    List<String> names = new ArrayList<>();
+    int lastTypeIndex = 0;
+    for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
+      types.add((int) args[lastTypeIndex]);
+      names.add((String) args[lastTypeIndex + 1]);
+    }
+
+    return new MockedUnboundedTable(
+        BeamSqlRecordType.create(names, types)
+    );
+  }
+
+  public MockedUnboundedTable timestampColumnIndex(int idx) {
+    this.timestampField = idx;
+    return this;
+  }
+
+  public MockedUnboundedTable addRows(Duration duration, Object... args) {
+    List<BeamSqlRow> rows = new ArrayList<>();
+    int fieldCount = getRecordType().size();
+
+    for (int i = 0; i < args.length; i += fieldCount) {
+      BeamSqlRow row = new BeamSqlRow(getRecordType());
+      for (int j = 0; j < fieldCount; j++) {
+        row.addField(j, args[i + j]);
+      }
+      rows.add(row);
+    }
+
+    // record the watermark + rows
+    this.timestampedRows.add(Pair.of(duration, rows));
+    return this;
+  }
+
+  @Override public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+    TestStream.Builder<BeamSqlRow> values = TestStream.create(
+        new BeamSqlRowCoder(beamSqlRecordType));
+
+    for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+      values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
+      for (int i = 0; i < pair.getValue().size(); i++) {
+        values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
+            new Instant(pair.getValue().get(i).getDate(timestampField))));
+      }
+    }
+
+    return pipeline.begin().apply(
+        "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
+        values.advanceWatermarkToInfinity());
+  }
+
+  @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+    throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!");
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
new file mode 100644
index 0000000..505b742
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Bounded + Bounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelBoundedVsBoundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS",
+        MockedBeamSqlTable
+            .of(SqlTypeName.INTEGER, "order_id",
+                SqlTypeName.INTEGER, "site_id",
+                SqlTypeName.INTEGER, "price",
+
+                1, 2, 3,
+                2, 3, 3,
+                3, 4, 5));
+
+    beamSqlEnv.registerTable("ORDER_DETAILS0",
+        MockedBeamSqlTable
+            .of(SqlTypeName.INTEGER, "order_id0",
+                SqlTypeName.INTEGER, "site_id0",
+                SqlTypeName.INTEGER, "price0",
+
+                1, 2, 3,
+                2, 3, 3,
+                3, 4, 5));
+
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+        + "FROM ORDER_DETAILS o1"
+        + " JOIN ORDER_DETAILS o2"
+        + " on "
+        + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+        SqlTypeName.INTEGER, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.INTEGER, "price",
+        SqlTypeName.INTEGER, "order_id0",
+        SqlTypeName.INTEGER, "site_id0",
+        SqlTypeName.INTEGER, "price0",
+
+        2, 3, 3, 1, 2, 3
+        ).getInputRecords());
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS o1"
+            + " LEFT OUTER JOIN ORDER_DETAILS0 o2"
+            + " on "
+            + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+        SqlTypeName.INTEGER, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.INTEGER, "price",
+        SqlTypeName.INTEGER, "order_id0",
+        SqlTypeName.INTEGER, "site_id0",
+        SqlTypeName.INTEGER, "price0",
+
+        1, 2, 3, null, null, null,
+        2, 3, 3, 1, 2, 3,
+        3, 4, 5, null, null, null
+    ).getInputRecords());
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS o1"
+            + " RIGHT OUTER JOIN ORDER_DETAILS o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+        SqlTypeName.INTEGER, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.INTEGER, "price",
+        SqlTypeName.INTEGER, "order_id0",
+        SqlTypeName.INTEGER, "site_id0",
+        SqlTypeName.INTEGER, "price0",
+
+        2, 3, 3, 1, 2, 3,
+        null, null, null, 2, 3, 3,
+        null, null, null, 3, 4, 5
+    ).getInputRecords());
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS o1"
+            + " FULL OUTER JOIN ORDER_DETAILS o2"
+            + " on "
+            + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+        SqlTypeName.INTEGER, "order_id",
+        SqlTypeName.INTEGER, "site_id",
+        SqlTypeName.INTEGER, "price",
+        SqlTypeName.INTEGER, "order_id0",
+        SqlTypeName.INTEGER, "site_id0",
+        SqlTypeName.INTEGER, "price0",
+
+        2, 3, 3, 1, 2, 3,
+        1, 2, 3, null, null, null,
+        3, 4, 5, null, null, null,
+        null, null, null, 2, 3, 3,
+        null, null, null, 3, 4, 5
+    ).getInputRecords());
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testException_nonEqualJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS o1"
+            + " JOIN ORDER_DETAILS o2"
+            + " on "
+            + " o1.order_id>o2.site_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testException_crossJoin() throws Exception {
+    String sql =
+        "SELECT *  "
+            + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2";
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
new file mode 100644
index 0000000..2ddb00b
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsBoundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+  public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(
+            Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 2, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardSeconds(1)),
+            2, 2, 3, SECOND_DATE,
+            2, 3, 3, SECOND_DATE,
+            // this late data is omitted
+            1, 2, 3, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
+            3, 3, 3, THIRD_DATE,
+            // this late data is omitted
+            2, 2, 3, SECOND_DATE
+        )
+    );
+
+    beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable
+        .of(SqlTypeName.INTEGER, "order_id",
+            SqlTypeName.VARCHAR, "buyer",
+
+            1, "james",
+            2, "bond"
+        ));
+  }
+
+  @Test
+  public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).values(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).values(
+                1, 3, "james",
+                2, 5, "bond"
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).values(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testLeftOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " RIGHT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.VARCHAR, "buyer"
+            ).values(
+                1, 3, "james",
+                2, 5, "bond",
+                3, 3, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRightOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + " ORDER_DETAILS1 o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testFullOuterJoinError() throws Exception {
+    String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+        + " ORDER_DETAILS1 o2 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
new file mode 100644
index 0000000..18a5f60
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsUnboundedTest {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+  private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+  public static final Date FIRST_DATE = new Date(1);
+  public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+
+  private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+  @BeforeClass
+  public static void prepare() {
+    beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+        .of(Types.INTEGER, "order_id",
+            Types.INTEGER, "site_id",
+            Types.INTEGER, "price",
+            Types.TIMESTAMP, "order_time"
+        )
+        .timestampColumnIndex(3)
+        .addRows(
+            Duration.ZERO,
+            1, 1, 1, FIRST_DATE,
+            1, 2, 6, FIRST_DATE
+        )
+        .addRows(
+            WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+            2, 2, 7, SECOND_DATE,
+            2, 3, 8, SECOND_DATE,
+            // this late record is omitted(First window)
+            1, 3, 3, FIRST_DATE
+        )
+        .addRows(
+            // this late record is omitted(Second window)
+            WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+            2, 3, 3, SECOND_DATE
+        )
+    );
+  }
+
+  @Test
+  public void testInnerJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0").values(
+                1, 3, 1, 3,
+                2, 5, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    // 1, 1 | 1, 3
+    // 2, 2 | NULL, NULL
+    // ---- | -----
+    // 2, 2 | 2, 5
+    // 3, 3 | NULL, NULL
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).values(
+                1, 1, 1, 3,
+                2, 2, null, null,
+                2, 2, 2, 5,
+                3, 3, null, null
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testRightOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " RIGHT OUTER JOIN "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id0",
+                Types.INTEGER, "sum_site_id0"
+            ).values(
+                1, 3, 1, 1,
+                null, null, 2, 2,
+                2, 5, 2, 2,
+                null, null, 3, 3
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+        + " FULL OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id1=o2.order_id"
+        ;
+
+    PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
+    PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+        .containsInAnyOrder(
+            TestUtils.RowsBuilder.of(
+                Types.INTEGER, "order_id1",
+                Types.INTEGER, "sum_site_id",
+                Types.INTEGER, "order_id",
+                Types.INTEGER, "sum_site_id0"
+            ).values(
+                1, 1, 1, 3,
+                6, 2, null, null,
+                7, 2, null, null,
+                8, 3, null, null,
+                null, null, 2, 5
+            ).getStringRows()
+        );
+    pipeline.run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testWindowsMismatch() throws Exception {
+    String sql = "SELECT * FROM "
+        + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
+        + " LEFT OUTER JOIN "
+        + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+        + "          GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+        + " on "
+        + " o1.order_id=o2.order_id"
+        ;
+    pipeline.enableAbandonedNodeEnforcement(false);
+    BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index b358fe1..f8eaa51 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -59,7 +59,7 @@ public class BeamSqlRowCoderTest {
 
     BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
         protoRowType.apply(new JavaTypeFactoryImpl(
-        RelDataTypeSystem.DEFAULT)));
+            RelDataTypeSystem.DEFAULT)));
     BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
     row.addField("col_tinyint", Byte.valueOf("1"));
     row.addField("col_smallint", Short.valueOf("1"));


[2/2] beam git commit: [BEAM-2193] This closes #3277

Posted by ta...@apache.org.
[BEAM-2193] This closes #3277


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2096da25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2096da25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2096da25

Branch: refs/heads/DSL_SQL
Commit: 2096da25e85d97ab52850453c2130ff706d7bcdf
Parents: ab4b118 928cec5
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Jun 29 16:34:45 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 29 16:34:45 2017 -0700

----------------------------------------------------------------------
 dsls/pom.xml                                    |   2 +-
 dsls/sql/pom.xml                                |  16 +-
 .../beam/dsls/sql/planner/BeamRuleSets.java     |   6 +-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  19 +-
 .../apache/beam/dsls/sql/rel/BeamJoinRel.java   | 305 +++++++++++++++++++
 .../apache/beam/dsls/sql/rule/BeamJoinRule.java |  53 ++++
 .../beam/dsls/sql/schema/BeamSqlRecordType.java |   2 +-
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   2 +-
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |   3 -
 .../dsls/sql/transform/BeamJoinTransforms.java  | 166 ++++++++++
 .../org/apache/beam/dsls/sql/TestUtils.java     | 125 ++++++++
 .../dsls/sql/planner/MockedBeamSqlTable.java    |   5 +-
 .../beam/dsls/sql/planner/MockedTable.java      |  33 ++
 .../dsls/sql/planner/MockedUnboundedTable.java  | 120 ++++++++
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    | 195 ++++++++++++
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  | 242 +++++++++++++++
 .../BeamJoinRelUnboundedVsUnboundedTest.java    | 219 +++++++++++++
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   2 +-
 18 files changed, 1486 insertions(+), 29 deletions(-)
----------------------------------------------------------------------