You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/06/29 23:35:40 UTC
[1/2] beam git commit: [BEAM-2193] Implement FULL, INNER,
and OUTER JOIN: - FULL and INNER supported on all variations of
unbounded/bounded joins. - OUTER JOIN supported when outer side is unbounded.
- Unbounded/bounded joins implemented via side inputs.
Repository: beam
Updated Branches:
refs/heads/DSL_SQL ab4b11886 -> 2096da25e
[BEAM-2193] Implement FULL, INNER, and OUTER JOIN:
- FULL and INNER supported on all variations of unbounded/bounded joins.
- OUTER JOIN supported when outer side is unbounded.
- Unbounded/bounded joins implemented via side inputs.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/928cec59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/928cec59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/928cec59
Branch: refs/heads/DSL_SQL
Commit: 928cec597175c363d444331b35ac8793297a242b
Parents: ab4b118
Author: James Xu <xu...@gmail.com>
Authored: Mon May 29 11:11:34 2017 +0800
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 29 16:32:23 2017 -0700
----------------------------------------------------------------------
dsls/pom.xml | 2 +-
dsls/sql/pom.xml | 16 +-
.../beam/dsls/sql/planner/BeamRuleSets.java | 6 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 19 +-
.../apache/beam/dsls/sql/rel/BeamJoinRel.java | 305 +++++++++++++++++++
.../apache/beam/dsls/sql/rule/BeamJoinRule.java | 53 ++++
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 2 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 2 +-
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 3 -
.../dsls/sql/transform/BeamJoinTransforms.java | 166 ++++++++++
.../org/apache/beam/dsls/sql/TestUtils.java | 125 ++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 5 +-
.../beam/dsls/sql/planner/MockedTable.java | 33 ++
.../dsls/sql/planner/MockedUnboundedTable.java | 120 ++++++++
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 195 ++++++++++++
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 242 +++++++++++++++
.../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++++++
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +-
18 files changed, 1486 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/pom.xml b/dsls/pom.xml
index d932698..a518d03 100644
--- a/dsls/pom.xml
+++ b/dsls/pom.xml
@@ -66,7 +66,7 @@
</plugin>
</plugins>
</pluginManagement>
-
+
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index a2279d5..54f590e 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -157,6 +157,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.9.0.1</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -193,21 +198,18 @@
<artifactId>joda-time</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.10.1.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
index 6c73558..552ff8f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamRuleSets.java
@@ -19,15 +19,14 @@ package org.apache.beam.dsls.sql.planner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-
import java.util.Iterator;
-
import org.apache.beam.dsls.sql.rel.BeamRelNode;
import org.apache.beam.dsls.sql.rule.BeamAggregationRule;
import org.apache.beam.dsls.sql.rule.BeamFilterRule;
import org.apache.beam.dsls.sql.rule.BeamIOSinkRule;
import org.apache.beam.dsls.sql.rule.BeamIOSourceRule;
import org.apache.beam.dsls.sql.rule.BeamIntersectRule;
+import org.apache.beam.dsls.sql.rule.BeamJoinRule;
import org.apache.beam.dsls.sql.rule.BeamMinusRule;
import org.apache.beam.dsls.sql.rule.BeamProjectRule;
import org.apache.beam.dsls.sql.rule.BeamSortRule;
@@ -47,7 +46,8 @@ public class BeamRuleSets {
.<RelOptRule>builder().add(BeamIOSourceRule.INSTANCE, BeamProjectRule.INSTANCE,
BeamFilterRule.INSTANCE, BeamIOSinkRule.INSTANCE,
BeamAggregationRule.INSTANCE, BeamSortRule.INSTANCE, BeamValuesRule.INSTANCE,
- BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE)
+ BeamIntersectRule.INSTANCE, BeamMinusRule.INSTANCE, BeamUnionRule.INSTANCE,
+ BeamJoinRule.INSTANCE)
.build();
public static RuleSet[] getRuleSets() {
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 701f620..9ec9e9f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -74,40 +74,41 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
, BeamSqlEnv sqlEnv) throws Exception {
RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
+ String stageName = BeamSqlRelUtils.getStageName(this) + "_";
PCollection<BeamSqlRow> upstream =
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
if (windowFieldIdx != -1) {
- upstream = upstream.apply(stageName + "_assignEventTimestamp", WithTimestamps
- .<BeamSqlRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+ upstream = upstream.apply(stageName + "assignEventTimestamp", WithTimestamps
+ .of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
.setCoder(upstream.getCoder());
}
- PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "_window",
- Window.<BeamSqlRow>into(windowFn)
+ PCollection<BeamSqlRow> windowStream = upstream.apply(stageName + "window",
+ Window.into(windowFn)
.triggering(trigger)
.withAllowedLateness(allowedLatence)
.accumulatingFiredPanes());
BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply(
- stageName + "_exCombineBy",
+ stageName + "exCombineBy",
WithKeys
.of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
windowFieldIdx, groupSet)))
- .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder()));
+ .setCoder(KvCoder.of(keyCoder, upstream.getCoder()));
+
BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
- stageName + "_combineBy",
+ stageName + "combineBy",
Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
CalciteUtils.toBeamRecordType(input.getRowType()))))
.setCoder(KvCoder.of(keyCoder, aggCoder));
- PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord",
+ PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord",
ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
CalciteUtils.toBeamRecordType(getRowType()), getAggCallList())));
mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
new file mode 100644
index 0000000..e85368e
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamJoinRel.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.dsls.sql.transform.BeamJoinTransforms;
+import org.apache.beam.dsls.sql.utils.CalciteUtils;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Pair;
+
+/**
+ * {@code BeamRelNode} to replace a {@code Join} node.
+ *
+ * <p>Support for join can be categorized into 3 cases:
+ * <ul>
+ * <li>BoundedTable JOIN BoundedTable</li>
+ * <li>UnboundedTable JOIN UnboundedTable</li>
+ * <li>BoundedTable JOIN UnboundedTable</li>
+ * </ul>
+ *
+ * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
+ * sides match.
+ *
+ * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
+ * constraints:
+ *
+ * <ul>
+ * <li>{@code FULL OUTER JOIN} is not supported.</li>
+ * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
+ * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
+ * </ul>
+ *
+ *
+ * <p>There are also some general constraints:
+ *
+ * <ul>
+ * <li>Only equi-join is supported.</li>
+ * <li>CROSS JOIN is not supported.</li>
+ * </ul>
+ */
+public class BeamJoinRel extends Join implements BeamRelNode {
+ public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+ super(cluster, traits, left, right, condition, variablesSet, joinType);
+ }
+
+ @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
+ RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
+ joinType);
+ }
+
+ @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
+ BeamSqlEnv sqlEnv)
+ throws Exception {
+ BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
+ BeamSqlRecordType leftRowType = CalciteUtils.toBeamRecordType(left.getRowType());
+ PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ leftRows.setCoder(new BeamSqlRowCoder(leftRowType));
+
+ final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+ BeamSqlRecordType rightRowType = CalciteUtils.toBeamRecordType(right.getRowType());
+ PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ rightRows.setCoder(new BeamSqlRowCoder(rightRowType));
+
+ String stageName = BeamSqlRelUtils.getStageName(this);
+ WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
+ WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
+
+ // extract the join fields
+ List<Pair<Integer, Integer>> pairs = extractJoinColumns(
+ leftRelNode.getRowType().getFieldCount());
+
+ // build the extract key type
+ // the name of the join field is not important
+ List<String> names = new ArrayList<>(pairs.size());
+ List<Integer> types = new ArrayList<>(pairs.size());
+ for (int i = 0; i < pairs.size(); i++) {
+ names.add("c" + i);
+ types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
+ }
+ BeamSqlRecordType extractKeyRowType = BeamSqlRecordType.create(names, types);
+
+ Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
+
+ // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
+ .apply(stageName + "_left_ExtractJoinFields",
+ MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
+
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
+ .apply(stageName + "_right_ExtractJoinFields",
+ MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
+ .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
+
+ // prepare the NullRows
+ BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
+ BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
+
+ // a regular join
+ if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
+ try {
+ leftWinFn.verifyCompatibility(rightWinFn);
+ } catch (IncompatibleWindowException e) {
+ throw new IllegalArgumentException(
+ "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
+ }
+
+ return standardJoin(extractedLeftRows, extractedRightRows,
+ leftNullRow, rightNullRow, stageName);
+ } else if (
+ (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
+ || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ ) {
+ // if one of the sides is Bounded & the other is Unbounded
+ // then do a sideInput join
+ // when doing a sideInput join, the windowFn does not need to match
+ // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
+ // the unbounded
+ if (joinType == JoinRelType.FULL) {
+ throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
+ + "a bounded table with an unbounded table.");
+ }
+
+ if ((joinType == JoinRelType.LEFT
+ && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
+ || (joinType == JoinRelType.RIGHT
+ && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
+ throw new UnsupportedOperationException(
+ "LEFT side of an OUTER JOIN must be Unbounded table.");
+ }
+
+ return sideInputJoin(extractedLeftRows, extractedRightRows,
+ leftNullRow, rightNullRow);
+ } else {
+ throw new UnsupportedOperationException(
+ "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
+ }
+ }
+
+ private PCollection<BeamSqlRow> standardJoin(
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+ BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
+ PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
+ switch (joinType) {
+ case LEFT:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
+ break;
+ case RIGHT:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
+ break;
+ case FULL:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
+ rightNullRow);
+ break;
+ case INNER:
+ default:
+ joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
+ .innerJoin(extractedLeftRows, extractedRightRows);
+ break;
+ }
+
+ PCollection<BeamSqlRow> ret = joinedRows
+ .apply(stageName + "_JoinParts2WholeRow",
+ MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+ return ret;
+ }
+
+ public PCollection<BeamSqlRow> sideInputJoin(
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
+ BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
+ // we always make the Unbounded table on the left to do the sideInput join
+ // (will convert the result accordingly before return)
+ boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
+ JoinRelType realJoinType =
+ (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
+
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
+ swapped ? extractedRightRows : extractedLeftRows;
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
+ swapped ? extractedLeftRows : extractedRightRows;
+ BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
+
+ // swapped still need to pass down because, we need to swap the result back.
+ return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
+ realRightNullRow, swapped);
+ }
+
+ private PCollection<BeamSqlRow> sideInputJoinHelper(
+ JoinRelType joinType,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
+ PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
+ BeamSqlRow rightNullRow, boolean swapped) {
+ final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
+ .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
+
+ PCollection<BeamSqlRow> ret = leftRows
+ .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
+ joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
+ .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType())));
+
+ return ret;
+ }
+
+ private BeamSqlRow buildNullRow(BeamRelNode relNode) {
+ BeamSqlRecordType leftType = CalciteUtils.toBeamRecordType(relNode.getRowType());
+ BeamSqlRow nullRow = new BeamSqlRow(leftType);
+ for (int i = 0; i < leftType.size(); i++) {
+ nullRow.addField(i, null);
+ }
+ return nullRow;
+ }
+
+ private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
+ // it's a CROSS JOIN because: condition == true
+ if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
+ throw new UnsupportedOperationException("CROSS JOIN is not supported!");
+ }
+
+ RexCall call = (RexCall) condition;
+ List<Pair<Integer, Integer>> pairs = new ArrayList<>();
+ if ("AND".equals(call.getOperator().getName())) {
+ List<RexNode> operands = call.getOperands();
+ for (RexNode rexNode : operands) {
+ Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
+ pairs.add(pair);
+ }
+ } else if ("=".equals(call.getOperator().getName())) {
+ pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
+ } else {
+ throw new UnsupportedOperationException(
+ "Operator " + call.getOperator().getName() + " is not supported in join condition");
+ }
+
+ return pairs;
+ }
+
+ private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
+ int leftRowColumnCount) {
+ List<RexNode> operands = oneCondition.getOperands();
+ final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+
+ final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
+ ((RexInputRef) operands.get(1)).getIndex());
+ final int rightIndex = rightIndex1 - leftRowColumnCount;
+
+ return new Pair<>(leftIndex, rightIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
new file mode 100644
index 0000000..78253fe
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rule;
+
+import org.apache.beam.dsls.sql.rel.BeamJoinRel;
+import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.logical.LogicalJoin;
+
+/**
+ * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
+ */
+public class BeamJoinRule extends ConverterRule {
+ public static final BeamJoinRule INSTANCE = new BeamJoinRule();
+ private BeamJoinRule() {
+ super(LogicalJoin.class, Convention.NONE,
+ BeamLogicalConvention.INSTANCE, "BeamJoinRule");
+ }
+
+ @Override public RelNode convert(RelNode rel) {
+ Join join = (Join) rel;
+ return new BeamJoinRel(
+ join.getCluster(),
+ join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
+ convert(join.getLeft(),
+ join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ convert(join.getRight(),
+ join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+ join.getCondition(),
+ join.getVariablesSet(),
+ join.getJoinType()
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
index 9fc3945..52bd652 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -31,7 +31,7 @@ public abstract class BeamSqlRecordType implements Serializable {
public abstract List<Integer> getFieldsType();
public static BeamSqlRecordType create(List<String> fieldNames, List<Integer> fieldTypes) {
- return new AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
+ return new org.apache.beam.dsls.sql.schema.AutoValue_BeamSqlRecordType(fieldNames, fieldTypes);
}
public int size() {
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 213dcd5..2d7e350 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -365,6 +365,6 @@ public class BeamSqlRow implements Serializable {
}
@Override public int hashCode() {
- return toString().hashCode();
+ return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index e86fb3f..d53ba8d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
-
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
@@ -58,7 +57,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
@Override
public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
listCoder.encode(value.getNullFields(), outStream);
-
for (int idx = 0; idx < value.size(); ++idx) {
if (value.getNullFields().contains(idx)) {
continue;
@@ -113,7 +111,6 @@ public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
BeamSqlRow record = new BeamSqlRow(tableSchema);
record.setNullFields(nullFields);
-
for (int idx = 0; idx < tableSchema.size(); ++idx) {
if (nullFields.contains(idx)) {
continue;
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
new file mode 100644
index 0000000..8169b83
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamJoinTransforms.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.Pair;
+
+/**
+ * Collections of {@code PTransform} and {@code DoFn} used to perform JOIN operation.
+ */
+public class BeamJoinTransforms {
+
+ /**
+ * A {@code SimpleFunction} to extract join fields from the specified row.
+ */
+ public static class ExtractJoinFields
+ extends SimpleFunction<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> {
+ private final boolean isLeft;
+ private final List<Pair<Integer, Integer>> joinColumns;
+
+ public ExtractJoinFields(boolean isLeft, List<Pair<Integer, Integer>> joinColumns) {
+ this.isLeft = isLeft;
+ this.joinColumns = joinColumns;
+ }
+
+ @Override public KV<BeamSqlRow, BeamSqlRow> apply(BeamSqlRow input) {
+ // build the type
+ // the name of the join field is not important
+ List<String> names = new ArrayList<>(joinColumns.size());
+ List<Integer> types = new ArrayList<>(joinColumns.size());
+ for (int i = 0; i < joinColumns.size(); i++) {
+ names.add("c" + i);
+ types.add(isLeft
+ ? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) :
+ input.getDataType().getFieldsType().get(joinColumns.get(i).getValue()));
+ }
+ BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+
+ // build the row
+ BeamSqlRow row = new BeamSqlRow(type);
+ for (int i = 0; i < joinColumns.size(); i++) {
+ row.addField(i, input
+ .getFieldValue(isLeft ? joinColumns.get(i).getKey() : joinColumns.get(i).getValue()));
+ }
+ return KV.of(row, input);
+ }
+ }
+
+
+ /**
+ * A {@code DoFn} which implement the sideInput-JOIN.
+ */
+ public static class SideInputJoinDoFn extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> {
+ private final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView;
+ private final JoinRelType joinType;
+ private final BeamSqlRow rightNullRow;
+ private final boolean swap;
+
+ public SideInputJoinDoFn(JoinRelType joinType, BeamSqlRow rightNullRow,
+ PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> sideInputView,
+ boolean swap) {
+ this.joinType = joinType;
+ this.rightNullRow = rightNullRow;
+ this.sideInputView = sideInputView;
+ this.swap = swap;
+ }
+
+ @ProcessElement public void processElement(ProcessContext context) {
+ BeamSqlRow key = context.element().getKey();
+ BeamSqlRow leftRow = context.element().getValue();
+ Map<BeamSqlRow, Iterable<BeamSqlRow>> key2Rows = context.sideInput(sideInputView);
+ Iterable<BeamSqlRow> rightRowsIterable = key2Rows.get(key);
+
+ if (rightRowsIterable != null && rightRowsIterable.iterator().hasNext()) {
+ Iterator<BeamSqlRow> it = rightRowsIterable.iterator();
+ while (it.hasNext()) {
+ context.output(combineTwoRowsIntoOne(leftRow, it.next(), swap));
+ }
+ } else {
+ if (joinType == JoinRelType.LEFT) {
+ context.output(combineTwoRowsIntoOne(leftRow, rightNullRow, swap));
+ }
+ }
+ }
+ }
+
+
+ /**
+ * A {@code SimpleFunction} to combine two rows into one.
+ */
+ public static class JoinParts2WholeRow
+ extends SimpleFunction<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>, BeamSqlRow> {
+ @Override public BeamSqlRow apply(KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>> input) {
+ KV<BeamSqlRow, BeamSqlRow> parts = input.getValue();
+ BeamSqlRow leftRow = parts.getKey();
+ BeamSqlRow rightRow = parts.getValue();
+ return combineTwoRowsIntoOne(leftRow, rightRow, false);
+ }
+ }
+
+ /**
+ * As the method name suggests: combine two rows into one wide row.
+ */
+ private static BeamSqlRow combineTwoRowsIntoOne(BeamSqlRow leftRow,
+ BeamSqlRow rightRow, boolean swap) {
+ if (swap) {
+ return combineTwoRowsIntoOneHelper(rightRow, leftRow);
+ } else {
+ return combineTwoRowsIntoOneHelper(leftRow, rightRow);
+ }
+ }
+
+ /**
+ * As the method name suggests: combine two rows into one wide row.
+ */
+ private static BeamSqlRow combineTwoRowsIntoOneHelper(BeamSqlRow leftRow,
+ BeamSqlRow rightRow) {
+ // build the type
+ List<String> names = new ArrayList<>(leftRow.size() + rightRow.size());
+ names.addAll(leftRow.getDataType().getFieldsName());
+ names.addAll(rightRow.getDataType().getFieldsName());
+
+ List<Integer> types = new ArrayList<>(leftRow.size() + rightRow.size());
+ types.addAll(leftRow.getDataType().getFieldsType());
+ types.addAll(rightRow.getDataType().getFieldsType());
+ BeamSqlRecordType type = BeamSqlRecordType.create(names, types);
+
+ BeamSqlRow row = new BeamSqlRow(type);
+ // build the row
+ for (int i = 0; i < leftRow.size(); i++) {
+ row.addField(i, leftRow.getFieldValue(i));
+ }
+
+ for (int i = 0; i < rightRow.size(); i++) {
+ row.addField(i + leftRow.size(), rightRow.getFieldValue(i));
+ }
+
+ return row;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
new file mode 100644
index 0000000..375027a
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/TestUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Test utilities.
+ */
+public class TestUtils {
+
+ /**
+ * A {@code DoFn} to convert a {@code BeamSqlRow} to a comparable {@code String}.
+ */
+ public static class BeamSqlRow2StringDoFn extends DoFn<BeamSqlRow, String> {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ ctx.output(ctx.element().valueInString());
+ }
+ }
+
+ /**
+ * Convert list of {@code BeamSqlRow} to list of {@code String}.
+ */
+ public static List<String> beamSqlRows2Strings(List<BeamSqlRow> rows) {
+ List<String> strs = new ArrayList<>();
+ for (BeamSqlRow row : rows) {
+ strs.add(row.valueInString());
+ }
+
+ return strs;
+ }
+
+ /**
+ * Convenient way to build a list of {@code BeamSqlRow}s.
+ *
+ * <p>You can use it like this:
+ *
+ * <pre>{@code
+ * TestUtils.RowsBuilder.of(
+ * Types.INTEGER, "order_id",
+ * Types.INTEGER, "sum_site_id",
+ * Types.VARCHAR, "buyer"
+ * ).values(
+ * 1, 3, "james",
+ * 2, 5, "bond"
+ * ).getStringRows()
+ * }</pre>
+ * {@code}
+ */
+ public static class RowsBuilder {
+ private BeamSqlRecordType type;
+ private List<BeamSqlRow> rows = new ArrayList<>();
+
+ /**
+ * Create a RowsBuilder with the specified row type info.
+ *
+ * <p>Note: check the class javadoc for for detailed example.
+ *
+ * @args pairs of column type and column names.
+ */
+ public static RowsBuilder of(final Object... args) {
+ List<Integer> types = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+ int lastTypeIndex = 0;
+ for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
+ types.add((int) args[lastTypeIndex]);
+ names.add((String) args[lastTypeIndex + 1]);
+ }
+
+ BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.create(names, types);
+ RowsBuilder builder = new RowsBuilder();
+ builder.type = beamSQLRecordType;
+
+ return builder;
+ }
+
+ /**
+ * Add values to the builder.
+ *
+ * <p>Note: check the class javadoc for for detailed example.
+ */
+ public RowsBuilder values(final Object... args) {
+ int fieldCount = type.size();
+ for (int i = 0; i < args.length; i += fieldCount) {
+ BeamSqlRow row = new BeamSqlRow(type);
+ for (int j = 0; j < fieldCount; j++) {
+ row.addField(j, args[i + j]);
+ }
+ this.rows.add(row);
+ }
+
+ return this;
+ }
+
+ public List<BeamSqlRow> getRows() {
+ return rows;
+ }
+
+ public List<String> getStringRows() {
+ return beamSqlRows2Strings(rows);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
index f651f6a..fa80cc1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamIOType;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
@@ -49,7 +48,6 @@ public class MockedBeamSqlTable extends BaseBeamTable {
public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>();
private List<BeamSqlRow> inputRecords;
-
public MockedBeamSqlTable(BeamSqlRecordType beamSqlRecordType) {
super(beamSqlRecordType);
}
@@ -119,10 +117,11 @@ public class MockedBeamSqlTable extends BaseBeamTable {
@Override
public BeamIOType getSourceType() {
- return BeamIOType.UNBOUNDED;
+ return BeamIOType.BOUNDED;
}
@Override
+
public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
return PBegin.in(pipeline).apply(
"MockedBeamSQLTable_Reader_" + COUNTER.incrementAndGet(), Create.of(inputRecords));
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
new file mode 100644
index 0000000..d096a61
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedTable.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+
+/**
+ * Base class for mocked table.
+ */
+public abstract class MockedTable extends BaseBeamTable {
+ public static final AtomicInteger COUNTER = new AtomicInteger();
+ public MockedTable(BeamSqlRecordType beamSqlRecordType) {
+ super(beamSqlRecordType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
new file mode 100644
index 0000000..3f22df3
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedUnboundedTable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamIOType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.calcite.util.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A mocked unbounded table.
+ */
+public class MockedUnboundedTable extends MockedTable {
+ private List<Pair<Duration, List<BeamSqlRow>>> timestampedRows = new ArrayList<>();
+ private int timestampField;
+ private MockedUnboundedTable(BeamSqlRecordType beamSqlRecordType) {
+ super(beamSqlRecordType);
+ }
+
+ /**
+ * Convenient way to build a mocked table.
+ *
+ * <p>e.g.
+ *
+ * <pre>{@code
+ * MockedUnboundedTable
+ * .of(Types.BIGINT, "order_id",
+ * Types.INTEGER, "site_id",
+ * Types.DOUBLE, "price",
+ * Types.TIMESTAMP, "order_time")
+ * }</pre>
+ */
+ public static MockedUnboundedTable of(final Object... args){
+ List<Integer> types = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+ int lastTypeIndex = 0;
+ for (; lastTypeIndex < args.length; lastTypeIndex += 2) {
+ types.add((int) args[lastTypeIndex]);
+ names.add((String) args[lastTypeIndex + 1]);
+ }
+
+ return new MockedUnboundedTable(
+ BeamSqlRecordType.create(names, types)
+ );
+ }
+
+ public MockedUnboundedTable timestampColumnIndex(int idx) {
+ this.timestampField = idx;
+ return this;
+ }
+
+ public MockedUnboundedTable addRows(Duration duration, Object... args) {
+ List<BeamSqlRow> rows = new ArrayList<>();
+ int fieldCount = getRecordType().size();
+
+ for (int i = 0; i < args.length; i += fieldCount) {
+ BeamSqlRow row = new BeamSqlRow(getRecordType());
+ for (int j = 0; j < fieldCount; j++) {
+ row.addField(j, args[i + j]);
+ }
+ rows.add(row);
+ }
+
+ // record the watermark + rows
+ this.timestampedRows.add(Pair.of(duration, rows));
+ return this;
+ }
+
+ @Override public BeamIOType getSourceType() {
+ return BeamIOType.UNBOUNDED;
+ }
+
+ @Override public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
+ TestStream.Builder<BeamSqlRow> values = TestStream.create(
+ new BeamSqlRowCoder(beamSqlRecordType));
+
+ for (Pair<Duration, List<BeamSqlRow>> pair : timestampedRows) {
+ values = values.advanceWatermarkTo(new Instant(0).plus(pair.getKey()));
+ for (int i = 0; i < pair.getValue().size(); i++) {
+ values = values.addElements(TimestampedValue.of(pair.getValue().get(i),
+ new Instant(pair.getValue().get(i).getDate(timestampField))));
+ }
+ }
+
+ return pipeline.begin().apply(
+ "MockedUnboundedTable_" + COUNTER.incrementAndGet(),
+ values.advanceWatermarkToInfinity());
+ }
+
+ @Override public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
+ throw new UnsupportedOperationException("MockedUnboundedTable#buildIOWriter unsupported!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
new file mode 100644
index 0000000..505b742
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelBoundedVsBoundedTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Bounded + Bounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelBoundedVsBoundedTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+
+ @BeforeClass
+ public static void prepare() {
+ beamSqlEnv.registerTable("ORDER_DETAILS",
+ MockedBeamSqlTable
+ .of(SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.INTEGER, "price",
+
+ 1, 2, 3,
+ 2, 3, 3,
+ 3, 4, 5));
+
+ beamSqlEnv.registerTable("ORDER_DETAILS0",
+ MockedBeamSqlTable
+ .of(SqlTypeName.INTEGER, "order_id0",
+ SqlTypeName.INTEGER, "site_id0",
+ SqlTypeName.INTEGER, "price0",
+
+ 1, 2, 3,
+ 2, 3, 3,
+ 3, 4, 5));
+
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1"
+ + " JOIN ORDER_DETAILS o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+ SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.INTEGER, "price",
+ SqlTypeName.INTEGER, "order_id0",
+ SqlTypeName.INTEGER, "site_id0",
+ SqlTypeName.INTEGER, "price0",
+
+ 2, 3, 3, 1, 2, 3
+ ).getInputRecords());
+ pipeline.run();
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1"
+ + " LEFT OUTER JOIN ORDER_DETAILS0 o2"
+ + " on "
+ + " o1.order_id=o2.site_id0 AND o2.price0=o1.site_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.enableAbandonedNodeEnforcement(false);
+ PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+ SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.INTEGER, "price",
+ SqlTypeName.INTEGER, "order_id0",
+ SqlTypeName.INTEGER, "site_id0",
+ SqlTypeName.INTEGER, "price0",
+
+ 1, 2, 3, null, null, null,
+ 2, 3, 3, 1, 2, 3,
+ 3, 4, 5, null, null, null
+ ).getInputRecords());
+ pipeline.run();
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1"
+ + " RIGHT OUTER JOIN ORDER_DETAILS o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+ SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.INTEGER, "price",
+ SqlTypeName.INTEGER, "order_id0",
+ SqlTypeName.INTEGER, "site_id0",
+ SqlTypeName.INTEGER, "price0",
+
+ 2, 3, 3, 1, 2, 3,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getInputRecords());
+ pipeline.run();
+ }
+
+ @Test
+ public void testFullOuterJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1"
+ + " FULL OUTER JOIN ORDER_DETAILS o2"
+ + " on "
+ + " o1.order_id=o2.site_id AND o2.price=o1.site_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of(
+ SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.INTEGER, "site_id",
+ SqlTypeName.INTEGER, "price",
+ SqlTypeName.INTEGER, "order_id0",
+ SqlTypeName.INTEGER, "site_id0",
+ SqlTypeName.INTEGER, "price0",
+
+ 2, 3, 3, 1, 2, 3,
+ 1, 2, 3, null, null, null,
+ 3, 4, 5, null, null, null,
+ null, null, null, 2, 3, 3,
+ null, null, null, 3, 4, 5
+ ).getInputRecords());
+ pipeline.run();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testException_nonEqualJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1"
+ + " JOIN ORDER_DETAILS o2"
+ + " on "
+ + " o1.order_id>o2.site_id"
+ ;
+
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testException_crossJoin() throws Exception {
+ String sql =
+ "SELECT * "
+ + "FROM ORDER_DETAILS o1, ORDER_DETAILS o2";
+
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
new file mode 100644
index 0000000..2ddb00b
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable;
+import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsBoundedTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ public static final Date FIRST_DATE = new Date(1);
+ public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+ public static final Date THIRD_DATE = new Date(1 + 3600 * 1000 + 3600 * 1000 + 1);
+ private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+ @BeforeClass
+ public static void prepare() {
+ beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+ .of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.TIMESTAMP, "order_time"
+ )
+ .timestampColumnIndex(3)
+ .addRows(
+ Duration.ZERO,
+ 1, 1, 1, FIRST_DATE,
+ 1, 2, 2, FIRST_DATE
+ )
+ .addRows(
+ WINDOW_SIZE.plus(Duration.standardSeconds(1)),
+ 2, 2, 3, SECOND_DATE,
+ 2, 3, 3, SECOND_DATE,
+ // this late data is omitted
+ 1, 2, 3, FIRST_DATE
+ )
+ .addRows(
+ WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardSeconds(1)),
+ 3, 3, 3, THIRD_DATE,
+ // this late data is omitted
+ 2, 2, 3, SECOND_DATE
+ )
+ );
+
+ beamSqlEnv.registerTable("ORDER_DETAILS1", MockedBeamSqlTable
+ .of(SqlTypeName.INTEGER, "order_id",
+ SqlTypeName.VARCHAR, "buyer",
+
+ 1, "james",
+ 2, "bond"
+ ));
+ }
+
+ @Test
+ public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " JOIN "
+ + " ORDER_DETAILS1 o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.VARCHAR, "buyer"
+ ).values(
+ 1, 3, "james",
+ 2, 5, "bond"
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testInnerJoin_boundedTableOnTheLeftSide() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + " ORDER_DETAILS1 o2 "
+ + " JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.VARCHAR, "buyer"
+ ).values(
+ 1, 3, "james",
+ 2, 5, "bond"
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " LEFT OUTER JOIN "
+ + " ORDER_DETAILS1 o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("helloworld")));
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.VARCHAR, "buyer"
+ ).values(
+ 1, 3, "james",
+ 2, 5, "bond",
+ 3, 3, null
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testLeftOuterJoinError() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + " ORDER_DETAILS1 o2 "
+ + " LEFT OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + " ORDER_DETAILS1 o2 "
+ + " RIGHT OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.VARCHAR, "buyer"
+ ).values(
+ 1, 3, "james",
+ 2, 5, "bond",
+ 3, 3, null
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testRightOuterJoinError() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " RIGHT OUTER JOIN "
+ + " ORDER_DETAILS1 o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testFullOuterJoinError() throws Exception {
+ String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
+ + " ORDER_DETAILS1 o2 "
+ + " FULL OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
new file mode 100644
index 0000000..18a5f60
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamJoinRelUnboundedVsUnboundedTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.rel;
+
+import java.sql.Types;
+import java.util.Date;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
+import org.apache.beam.dsls.sql.TestUtils;
+import org.apache.beam.dsls.sql.planner.MockedUnboundedTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.transform.BeamSqlOutputToConsoleFn;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unbounded + Unbounded Test for {@code BeamJoinRel}.
+ */
+public class BeamJoinRelUnboundedVsUnboundedTest {
+ @Rule
+ public final TestPipeline pipeline = TestPipeline.create();
+ private static final BeamSqlEnv beamSqlEnv = new BeamSqlEnv();
+ public static final Date FIRST_DATE = new Date(1);
+ public static final Date SECOND_DATE = new Date(1 + 3600 * 1000);
+
+ private static final Duration WINDOW_SIZE = Duration.standardHours(1);
+
+ @BeforeClass
+ public static void prepare() {
+ beamSqlEnv.registerTable("ORDER_DETAILS", MockedUnboundedTable
+ .of(Types.INTEGER, "order_id",
+ Types.INTEGER, "site_id",
+ Types.INTEGER, "price",
+ Types.TIMESTAMP, "order_time"
+ )
+ .timestampColumnIndex(3)
+ .addRows(
+ Duration.ZERO,
+ 1, 1, 1, FIRST_DATE,
+ 1, 2, 6, FIRST_DATE
+ )
+ .addRows(
+ WINDOW_SIZE.plus(Duration.standardMinutes(1)),
+ 2, 2, 7, SECOND_DATE,
+ 2, 3, 8, SECOND_DATE,
+ // this late record is omitted(First window)
+ 1, 3, 3, FIRST_DATE
+ )
+ .addRows(
+ // this late record is omitted(Second window)
+ WINDOW_SIZE.plus(WINDOW_SIZE).plus(Duration.standardMinutes(1)),
+ 2, 3, 3, SECOND_DATE
+ )
+ );
+ }
+
+ @Test
+ public void testInnerJoin() throws Exception {
+ String sql = "SELECT * FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "sum_site_id0").values(
+ 1, 3, 1, 3,
+ 2, 5, 2, 5
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ String sql = "SELECT * FROM "
+ + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " LEFT OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ // 1, 1 | 1, 3
+ // 2, 2 | NULL, NULL
+ // ---- | -----
+ // 2, 2 | 2, 5
+ // 3, 3 | NULL, NULL
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "sum_site_id0"
+ ).values(
+ 1, 1, 1, 3,
+ 2, 2, null, null,
+ 2, 2, 2, 5,
+ 3, 3, null, null
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testRightOuterJoin() throws Exception {
+ String sql = "SELECT * FROM "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " RIGHT OUTER JOIN "
+ + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id",
+ Types.INTEGER, "order_id0",
+ Types.INTEGER, "sum_site_id0"
+ ).values(
+ 1, 3, 1, 1,
+ null, null, 2, 2,
+ 2, 5, 2, 2,
+ null, null, 3, 3
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test
+ public void testFullOuterJoin() throws Exception {
+ String sql = "SELECT * FROM "
+ + "(select price as order_id1, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY price, TUMBLE(order_time, INTERVAL '1' HOUR)) o1 "
+ + " FULL OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id , TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+ + " on "
+ + " o1.order_id1=o2.order_id"
+ ;
+
+ PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ rows.apply(ParDo.of(new BeamSqlOutputToConsoleFn("hello")));
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id1",
+ Types.INTEGER, "sum_site_id",
+ Types.INTEGER, "order_id",
+ Types.INTEGER, "sum_site_id0"
+ ).values(
+ 1, 1, 1, 3,
+ 6, 2, null, null,
+ 7, 2, null, null,
+ 8, 3, null, null,
+ null, null, 2, 5
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWindowsMismatch() throws Exception {
+ String sql = "SELECT * FROM "
+ + "(select site_id as order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY site_id, TUMBLE(order_time, INTERVAL '2' HOUR)) o1 "
+ + " LEFT OUTER JOIN "
+ + "(select order_id, sum(site_id) as sum_site_id FROM ORDER_DETAILS "
+ + " GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)) o2 "
+ + " on "
+ + " o1.order_id=o2.order_id"
+ ;
+ pipeline.enableAbandonedNodeEnforcement(false);
+ BeamSqlCli.compilePipeline(sql, pipeline, beamSqlEnv);
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/928cec59/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index b358fe1..f8eaa51 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -59,7 +59,7 @@ public class BeamSqlRowCoderTest {
BeamSqlRecordType beamSQLRecordType = CalciteUtils.toBeamRecordType(
protoRowType.apply(new JavaTypeFactoryImpl(
- RelDataTypeSystem.DEFAULT)));
+ RelDataTypeSystem.DEFAULT)));
BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
row.addField("col_tinyint", Byte.valueOf("1"));
row.addField("col_smallint", Short.valueOf("1"));
[2/2] beam git commit: [BEAM-2193] This closes #3277
Posted by ta...@apache.org.
[BEAM-2193] This closes #3277
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2096da25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2096da25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2096da25
Branch: refs/heads/DSL_SQL
Commit: 2096da25e85d97ab52850453c2130ff706d7bcdf
Parents: ab4b118 928cec5
Author: Tyler Akidau <ta...@apache.org>
Authored: Thu Jun 29 16:34:45 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Thu Jun 29 16:34:45 2017 -0700
----------------------------------------------------------------------
dsls/pom.xml | 2 +-
dsls/sql/pom.xml | 16 +-
.../beam/dsls/sql/planner/BeamRuleSets.java | 6 +-
.../beam/dsls/sql/rel/BeamAggregationRel.java | 19 +-
.../apache/beam/dsls/sql/rel/BeamJoinRel.java | 305 +++++++++++++++++++
.../apache/beam/dsls/sql/rule/BeamJoinRule.java | 53 ++++
.../beam/dsls/sql/schema/BeamSqlRecordType.java | 2 +-
.../apache/beam/dsls/sql/schema/BeamSqlRow.java | 2 +-
.../beam/dsls/sql/schema/BeamSqlRowCoder.java | 3 -
.../dsls/sql/transform/BeamJoinTransforms.java | 166 ++++++++++
.../org/apache/beam/dsls/sql/TestUtils.java | 125 ++++++++
.../dsls/sql/planner/MockedBeamSqlTable.java | 5 +-
.../beam/dsls/sql/planner/MockedTable.java | 33 ++
.../dsls/sql/planner/MockedUnboundedTable.java | 120 ++++++++
.../rel/BeamJoinRelBoundedVsBoundedTest.java | 195 ++++++++++++
.../rel/BeamJoinRelUnboundedVsBoundedTest.java | 242 +++++++++++++++
.../BeamJoinRelUnboundedVsUnboundedTest.java | 219 +++++++++++++
.../dsls/sql/schema/BeamSqlRowCoderTest.java | 2 +-
18 files changed, 1486 insertions(+), 29 deletions(-)
----------------------------------------------------------------------