You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:13 UTC
[50/59] beam git commit: move all implementation classes/packages
into impl package
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
deleted file mode 100644
index f1da29f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlFilterFn;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Filter} node.
- *
- */
-public class BeamFilterRel extends Filter implements BeamRelNode {
-
- public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
- RexNode condition) {
- super(cluster, traits, child, condition);
- }
-
- @Override
- public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
- return new BeamFilterRel(getCluster(), traitSet, input, condition);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
- PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
- ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
- filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return filterStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
deleted file mode 100644
index ce941a0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.base.Joiner;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
- public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
- Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
- List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
- super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
- sourceExpressionList, flattened);
- }
-
- @Override
- public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
- getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
- }
-
- /**
- * Note that {@code BeamIOSinkRel} returns the input PCollection,
- * which is the persisted PCollection.
- */
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
-
- upstream.apply(stageName, targetTable.buildIOWriter());
-
- return upstream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
deleted file mode 100644
index 85f0bc8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-
-/**
- * BeamRelNode to replace a {@code TableScan} node.
- *
- */
-public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
- public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
- super(cluster, traitSet, table);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
- TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
- if (inputPCollections.has(sourceTupleTag)) {
- //choose PCollection from input PCollectionTuple if exists there.
- PCollection<BeamSqlRow> sourceStream = inputPCollections
- .get(new TupleTag<BeamSqlRow>(sourceName));
- return sourceStream;
- } else {
- //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
- BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
- return sourceTable.buildIOReader(inputPCollections.getPipeline())
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
deleted file mode 100644
index ae73a0d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Intersect} node.
- *
- * <p>This is used to combine two SELECT statements, but returns rows only from the
- * first SELECT statement that are identical to a row in the second SELECT statement.
- */
-public class BeamIntersectRel extends Intersect implements BeamRelNode {
- private BeamSetOperatorRelBase delegate;
- public BeamIntersectRel(
- RelOptCluster cluster,
- RelTraitSet traits,
- List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
deleted file mode 100644
index 3d9c9cd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.transform.BeamJoinTransforms;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Pair;
-
-/**
- * {@code BeamRelNode} to replace a {@code Join} node.
- *
- * <p>Support for join can be categorized into 3 cases:
- * <ul>
- * <li>BoundedTable JOIN BoundedTable</li>
- * <li>UnboundedTable JOIN UnboundedTable</li>
- * <li>BoundedTable JOIN UnboundedTable</li>
- * </ul>
- *
- * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
- * sides match.
- *
- * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
- * constraints:
- *
- * <ul>
- * <li>{@code FULL OUTER JOIN} is not supported.</li>
- * <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
- * <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
- * </ul>
- *
- *
- * <p>There are also some general constraints:
- *
- * <ul>
- * <li>Only equi-join is supported.</li>
- * <li>CROSS JOIN is not supported.</li>
- * </ul>
- */
-public class BeamJoinRel extends Join implements BeamRelNode {
- public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
- RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
- super(cluster, traits, left, right, condition, variablesSet, joinType);
- }
-
- @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
- RelNode right, JoinRelType joinType, boolean semiJoinDone) {
- return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
- joinType);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
- BeamSqlEnv sqlEnv)
- throws Exception {
- BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
- BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
- PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
- final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
- PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
- String stageName = BeamSqlRelUtils.getStageName(this);
- WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
- // extract the join fields
- List<Pair<Integer, Integer>> pairs = extractJoinColumns(
- leftRelNode.getRowType().getFieldCount());
-
- // build the extract key type
- // the name of the join field is not important
- List<String> names = new ArrayList<>(pairs.size());
- List<Integer> types = new ArrayList<>(pairs.size());
- for (int i = 0; i < pairs.size(); i++) {
- names.add("c" + i);
- types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
- }
- BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
-
- Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
-
- // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
- .apply(stageName + "_left_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
- .apply(stageName + "_right_ExtractJoinFields",
- MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
- .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
-
- // prepare the NullRows
- BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
- BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
-
- // a regular join
- if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
- try {
- leftWinFn.verifyCompatibility(rightWinFn);
- } catch (IncompatibleWindowException e) {
- throw new IllegalArgumentException(
- "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
- }
-
- return standardJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow, stageName);
- } else if (
- (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
- || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
- ) {
- // if one of the sides is Bounded & the other is Unbounded
- // then do a sideInput join
- // when doing a sideInput join, the windowFn does not need to match
- // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
- // the unbounded
- if (joinType == JoinRelType.FULL) {
- throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
- + "a bounded table with an unbounded table.");
- }
-
- if ((joinType == JoinRelType.LEFT
- && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
- || (joinType == JoinRelType.RIGHT
- && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
- throw new UnsupportedOperationException(
- "LEFT side of an OUTER JOIN must be Unbounded table.");
- }
-
- return sideInputJoin(extractedLeftRows, extractedRightRows,
- leftNullRow, rightNullRow);
- } else {
- throw new UnsupportedOperationException(
- "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
- }
- }
-
- private PCollection<BeamSqlRow> standardJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
- PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
- switch (joinType) {
- case LEFT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
- break;
- case RIGHT:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
- break;
- case FULL:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
- rightNullRow);
- break;
- case INNER:
- default:
- joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
- .innerJoin(extractedLeftRows, extractedRightRows);
- break;
- }
-
- PCollection<BeamSqlRow> ret = joinedRows
- .apply(stageName + "_JoinParts2WholeRow",
- MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
- return ret;
- }
-
- public PCollection<BeamSqlRow> sideInputJoin(
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
- BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
- // we always make the Unbounded table on the left to do the sideInput join
- // (will convert the result accordingly before return)
- boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
- JoinRelType realJoinType =
- (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
-
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
- swapped ? extractedRightRows : extractedLeftRows;
- PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
- swapped ? extractedLeftRows : extractedRightRows;
- BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
-
- // swapped still need to pass down because, we need to swap the result back.
- return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
- realRightNullRow, swapped);
- }
-
- private PCollection<BeamSqlRow> sideInputJoinHelper(
- JoinRelType joinType,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
- PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
- BeamSqlRow rightNullRow, boolean swapped) {
- final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
- .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
-
- PCollection<BeamSqlRow> ret = leftRows
- .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
- joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
- .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return ret;
- }
-
- private BeamSqlRow buildNullRow(BeamRelNode relNode) {
- BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
- BeamSqlRow nullRow = new BeamSqlRow(leftType);
- for (int i = 0; i < leftType.size(); i++) {
- nullRow.addField(i, null);
- }
- return nullRow;
- }
-
- private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
- // it's a CROSS JOIN because: condition == true
- if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
- throw new UnsupportedOperationException("CROSS JOIN is not supported!");
- }
-
- RexCall call = (RexCall) condition;
- List<Pair<Integer, Integer>> pairs = new ArrayList<>();
- if ("AND".equals(call.getOperator().getName())) {
- List<RexNode> operands = call.getOperands();
- for (RexNode rexNode : operands) {
- Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
- pairs.add(pair);
- }
- } else if ("=".equals(call.getOperator().getName())) {
- pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
- } else {
- throw new UnsupportedOperationException(
- "Operator " + call.getOperator().getName() + " is not supported in join condition");
- }
-
- return pairs;
- }
-
- private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
- int leftRowColumnCount) {
- List<RexNode> operands = oneCondition.getOperands();
- final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
-
- final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
- ((RexInputRef) operands.get(1)).getIndex());
- final int rightIndex = rightIndex1 - leftRowColumnCount;
-
- return new Pair<>(leftIndex, rightIndex);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
deleted file mode 100644
index 58b90ca..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-/**
- * Convertion for Beam SQL.
- *
- */
-public enum BeamLogicalConvention implements Convention {
- INSTANCE;
-
- @Override
- public Class getInterface() {
- return BeamRelNode.class;
- }
-
- @Override
- public String getName() {
- return "BEAM_LOGICAL";
- }
-
- @Override
- public RelTraitDef getTraitDef() {
- return ConventionTraitDef.INSTANCE;
- }
-
- @Override
- public boolean satisfies(RelTrait trait) {
- return this == trait;
- }
-
- @Override
- public void register(RelOptPlanner planner) {
- }
-
- @Override
- public String toString() {
- return getName();
- }
-
- @Override
- public boolean canConvertConvention(Convention toConvention) {
- return false;
- }
-
- @Override
- public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
deleted file mode 100644
index 8cef971..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Minus} node.
- *
- * <p>Corresponds to the SQL {@code EXCEPT} operator.
- */
-public class BeamMinusRel extends Minus implements BeamRelNode {
-
- private BeamSetOperatorRelBase delegate;
-
- public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamMinusRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
deleted file mode 100644
index 8f81038..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlProjectFn;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Project} node.
- *
- */
-public class BeamProjectRel extends Project implements BeamRelNode {
-
- /**
- * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
- *
- */
- public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
- List<? extends RexNode> projects, RelDataType rowType) {
- super(cluster, traits, input, projects, rowType);
- }
-
- @Override
- public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
- RelDataType rowType) {
- return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
- }
-
- @Override
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- String stageName = BeamSqlRelUtils.getStageName(this);
-
- PCollection<BeamSqlRow> upstream =
- BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
- BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
- PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
- .of(new BeamSqlProjectFn(getRelTypeName(), executor,
- CalciteUtils.toBeamRowType(rowType))));
- projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return projectStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
deleted file mode 100644
index 80a4b84..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
- */
-public interface BeamRelNode extends RelNode {
-
- /**
- * A {@link BeamRelNode} is a recursive structure, the
- * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
- * algorithm.
- */
- PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
- throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
deleted file mode 100644
index 7f80eb0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
- * and {@code BeamMinusRel}.
- */
-public class BeamSetOperatorRelBase {
- /**
- * Set operator type.
- */
- public enum OpType implements Serializable {
- UNION,
- INTERSECT,
- MINUS
- }
-
- private BeamRelNode beamRelNode;
- private List<RelNode> inputs;
- private boolean all;
- private OpType opType;
-
- public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
- List<RelNode> inputs, boolean all) {
- this.beamRelNode = beamRelNode;
- this.opType = opType;
- this.inputs = inputs;
- this.all = all;
- }
-
- public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
- .buildBeamPipeline(inputPCollections, sqlEnv);
- PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
- .buildBeamPipeline(inputPCollections, sqlEnv);
-
- WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
- WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
- if (!leftWindow.isCompatible(rightWindow)) {
- throw new IllegalArgumentException(
- "inputs of " + opType + " have different window strategy: "
- + leftWindow + " VS " + rightWindow);
- }
-
- final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
- final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
-
- // co-group
- String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
- PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
- .of(leftTag, leftRows.apply(
- stageName + "_CreateLeftIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .and(rightTag, rightRows.apply(
- stageName + "_CreateRightIndex", MapElements.via(
- new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
- .apply(CoGroupByKey.<BeamSqlRow>create());
- PCollection<BeamSqlRow> ret = coGbkResultCollection
- .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
- opType, all)));
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
deleted file mode 100644
index 363c0a9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollationImpl;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamRelNode} to replace a {@code Sort} node.
- *
- * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
- * the {@code Sort} algebra. The following types of ORDER BY are supported:
-
- * <pre>{@code
- * select * from t order by id desc limit 10;
- * select * from t order by id desc limit 10, 5;
- * }</pre>
- *
- * <p>but Order BY without a limit is NOT supported:
- *
- * <pre>{@code
- * select * from t order by id desc
- * }</pre>
- *
- * <h3>Constraints</h3>
- * <ul>
- * <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
- * must fit into the memory of a single machine.</li>
- * <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
- * it does not make much sense to use `ORDER BY` with `WINDOW`.
- * </li>
- * </ul>
- */
-public class BeamSortRel extends Sort implements BeamRelNode {
- private List<Integer> fieldIndices = new ArrayList<>();
- private List<Boolean> orientation = new ArrayList<>();
- private List<Boolean> nullsFirst = new ArrayList<>();
-
- private int startIndex = 0;
- private int count;
-
- public BeamSortRel(
- RelOptCluster cluster,
- RelTraitSet traits,
- RelNode child,
- RelCollation collation,
- RexNode offset,
- RexNode fetch) {
- super(cluster, traits, child, collation, offset, fetch);
-
- List<RexNode> fieldExps = getChildExps();
- RelCollationImpl collationImpl = (RelCollationImpl) collation;
- List<RelFieldCollation> collations = collationImpl.getFieldCollations();
- for (int i = 0; i < fieldExps.size(); i++) {
- RexNode fieldExp = fieldExps.get(i);
- RexInputRef inputRef = (RexInputRef) fieldExp;
- fieldIndices.add(inputRef.getIndex());
- orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
-
- RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
- if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
- rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
- }
- nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
- }
-
- if (fetch == null) {
- throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
- }
-
- RexLiteral fetchLiteral = (RexLiteral) fetch;
- count = ((BigDecimal) fetchLiteral.getValue()).intValue();
-
- if (offset != null) {
- RexLiteral offsetLiteral = (RexLiteral) offset;
- startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
- }
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- RelNode input = getInput();
- PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
- .buildBeamPipeline(inputPCollections, sqlEnv);
- Type windowType = upstream.getWindowingStrategy().getWindowFn()
- .getWindowTypeDescriptor().getType();
- if (!windowType.equals(GlobalWindow.class)) {
- throw new UnsupportedOperationException(
- "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
- }
-
- BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
- nullsFirst);
- // first find the top (offset + count)
- PCollection<List<BeamSqlRow>> rawStream =
- upstream.apply("extractTopOffsetAndFetch",
- Top.of(startIndex + count, comparator).withoutDefaults())
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-
- // strip the `leading offset`
- if (startIndex > 0) {
- rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
- new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
- .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
- }
-
- PCollection<BeamSqlRow> orderedStream = rawStream.apply(
- "flatten", Flatten.<BeamSqlRow>iterables());
- orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
- return orderedStream;
- }
-
- private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
- private int startIndex;
- private int endIndex;
-
- public SubListFn(int startIndex, int endIndex) {
- this.startIndex = startIndex;
- this.endIndex = endIndex;
- }
-
- @ProcessElement
- public void processElement(ProcessContext ctx) {
- ctx.output(ctx.element().subList(startIndex, endIndex));
- }
- }
-
- @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
- RexNode offset, RexNode fetch) {
- return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
- }
-
- private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
- private List<Integer> fieldsIndices;
- private List<Boolean> orientation;
- private List<Boolean> nullsFirst;
-
- public BeamSqlRowComparator(List<Integer> fieldsIndices,
- List<Boolean> orientation,
- List<Boolean> nullsFirst) {
- this.fieldsIndices = fieldsIndices;
- this.orientation = orientation;
- this.nullsFirst = nullsFirst;
- }
-
- @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
- for (int i = 0; i < fieldsIndices.size(); i++) {
- int fieldIndex = fieldsIndices.get(i);
- int fieldRet = 0;
- SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
- // whether NULL should be ordered first or last(compared to non-null values) depends on
- // what user specified in SQL(NULLS FIRST/NULLS LAST)
- if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
- continue;
- } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
- fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
- } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
- fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
- } else {
- switch (fieldType) {
- case TINYINT:
- fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
- break;
- case SMALLINT:
- fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
- break;
- case INTEGER:
- fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
- break;
- case BIGINT:
- fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
- break;
- case FLOAT:
- fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
- break;
- case DOUBLE:
- fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
- break;
- case VARCHAR:
- fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
- break;
- case DATE:
- fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
- break;
- default:
- throw new UnsupportedOperationException(
- "Data type: " + fieldType + " not supported yet!");
- }
- }
-
- fieldRet *= (orientation.get(i) ? -1 : 1);
- if (fieldRet != 0) {
- return fieldRet;
- }
- }
- return 0;
- }
- }
-
- public static <T extends Number & Comparable> int numberCompare(T a, T b) {
- return a.compareTo(b);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
deleted file mode 100644
index cc503d0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-class BeamSqlRelUtils {
- private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
-
- private static final AtomicInteger sequence = new AtomicInteger(0);
- private static final AtomicInteger classSequence = new AtomicInteger(0);
-
- public static String getStageName(BeamRelNode relNode) {
- return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
- + sequence.getAndIncrement();
- }
-
- public static String getClassName(BeamRelNode relNode) {
- return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
- + "_" + classSequence.getAndIncrement();
- }
-
- public static BeamRelNode getBeamRelInput(RelNode input) {
- if (input instanceof RelSubset) {
- // go with known best input
- input = ((RelSubset) input).getBest();
- }
- return (BeamRelNode) input;
- }
-
- public static String explain(final RelNode rel) {
- return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
- }
-
- public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
- String explain = "";
- try {
- explain = RelOptUtil.toString(rel);
- } catch (StackOverflowError e) {
- LOG.error("StackOverflowError occurred while extracting plan. "
- + "Please report it to the dev@ mailing list.");
- LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
- LOG.error("Forcing plan to empty string and continue... "
- + "SQL Runner may not working properly after.");
- }
- return explain;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
deleted file mode 100644
index 695521d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.core.Union;
-
-/**
- * {@link BeamRelNode} to replace a {@link Union}.
- *
- * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
- * perspective, two cases are supported:
- *
- * <p>1) Do not use {@code grouped window function}:
- *
- * <pre>{@code
- * select * from person UNION select * from person
- * }</pre>
- *
- * <p>2) Use the same {@code grouped window function}, with the same param:
- * <pre>{@code
- * select id, count(*) from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * UNION
- * select * from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * }</pre>
- *
- * <p>Inputs with different group functions are NOT supported:
- * <pre>{@code
- * select id, count(*) from person
- * group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * UNION
- * select * from person
- * group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
- * }</pre>
- */
-public class BeamUnionRel extends Union implements BeamRelNode {
- private BeamSetOperatorRelBase delegate;
- public BeamUnionRel(RelOptCluster cluster,
- RelTraitSet traits,
- List<RelNode> inputs,
- boolean all) {
- super(cluster, traits, inputs, all);
- this.delegate = new BeamSetOperatorRelBase(this,
- BeamSetOperatorRelBase.OpType.UNION,
- inputs, all);
- }
-
- public BeamUnionRel(RelInput input) {
- super(input);
- }
-
- @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
- return new BeamUnionRel(getCluster(), traitSet, inputs, all);
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
deleted file mode 100644
index f3bf3a3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLiteral;
-
-/**
- * {@code BeamRelNode} to replace a {@code Values} node.
- *
- * <p>{@code BeamValuesRel} will be used in the following SQLs:
- * <ul>
- * <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
- * <li>{@code select 1, '1', LOCALTIME}</li>
- * </ul>
- */
-public class BeamValuesRel extends Values implements BeamRelNode {
-
- public BeamValuesRel(
- RelOptCluster cluster,
- RelDataType rowType,
- ImmutableList<ImmutableList<RexLiteral>> tuples,
- RelTraitSet traits) {
- super(cluster, rowType, tuples, traits);
-
- }
-
- @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
- , BeamSqlEnv sqlEnv) throws Exception {
- List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
- String stageName = BeamSqlRelUtils.getStageName(this);
- if (tuples.isEmpty()) {
- throw new IllegalStateException("Values with empty tuples!");
- }
-
- BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
- for (ImmutableList<RexLiteral> tuple : tuples) {
- BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
- for (int i = 0; i < tuple.size(); i++) {
- BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
- }
- rows.add(row);
- }
-
- return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
- .setCoder(new BeamSqlRowCoder(beamSQLRowType));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
deleted file mode 100644
index fb0a8e2..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.apache.beam.sdk.extensions.sql.rel;
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
deleted file mode 100644
index 17e3f80..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Duration;
-
-/**
- * Rule to detect the window/trigger settings.
- *
- */
-public class BeamAggregationRule extends RelOptRule {
- public static final BeamAggregationRule INSTANCE =
- new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
-
- public BeamAggregationRule(
- Class<? extends Aggregate> aggregateClass,
- Class<? extends Project> projectClass,
- RelBuilderFactory relBuilderFactory) {
- super(
- operand(aggregateClass,
- operand(projectClass, any())),
- relBuilderFactory, null);
- }
-
- public BeamAggregationRule(RelOptRuleOperand operand, String description) {
- super(operand, description);
- }
-
- @Override
- public void onMatch(RelOptRuleCall call) {
- final Aggregate aggregate = call.rel(0);
- final Project project = call.rel(1);
- updateWindowTrigger(call, aggregate, project);
- }
-
- private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
- Project project) {
- ImmutableBitSet groupByFields = aggregate.getGroupSet();
- List<RexNode> projectMapping = project.getProjects();
-
- WindowFn windowFn = new GlobalWindows();
- Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
- int windowFieldIdx = -1;
- Duration allowedLatence = Duration.ZERO;
-
- for (int groupField : groupByFields.asList()) {
- RexNode projNode = projectMapping.get(groupField);
- if (projNode instanceof RexCall) {
- SqlOperator op = ((RexCall) projNode).op;
- ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
- String functionName = op.getName();
- switch (functionName) {
- case "TUMBLE":
- windowFieldIdx = groupField;
- windowFn = FixedWindows
- .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
- if (parameters.size() == 3) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- case "HOP":
- windowFieldIdx = groupField;
- windowFn = SlidingWindows
- .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
- .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
- if (parameters.size() == 4) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- case "SESSION":
- windowFieldIdx = groupField;
- windowFn = Sessions
- .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
- if (parameters.size() == 3) {
- GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
- .getValue();
- triggerFn = createTriggerWithDelay(delayTime);
- allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
- }
- break;
- default:
- break;
- }
- }
- }
-
- BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
- aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(aggregate.getInput(),
- aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- aggregate.indicator,
- aggregate.getGroupSet(),
- aggregate.getGroupSets(),
- aggregate.getAggCallList(),
- windowFn,
- triggerFn,
- windowFieldIdx,
- allowedLatence);
- call.transformTo(newAggregator);
- }
-
- private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
- return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
- .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
- }
-
- private long getWindowParameterAsMillis(RexNode parameterNode) {
- if (parameterNode instanceof RexLiteral) {
- return RexLiteral.intValue(parameterNode);
- } else {
- throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
deleted file mode 100644
index b30a9d9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
- public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
- private BeamFilterRule() {
- super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final Filter filter = (Filter) rel;
- final RelNode input = filter.getInput();
-
- return new BeamFilterRel(filter.getCluster(),
- filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- filter.getCondition());
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index 54079b0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
- public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
- private BeamIOSinkRule() {
- super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSinkRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableModify tableModify = (TableModify) rel;
- final RelNode input = tableModify.getInput();
-
- final RelOptCluster cluster = tableModify.getCluster();
- final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
- final RelOptTable relOptTable = tableModify.getTable();
- final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
- final RelNode convertedInput = convert(input,
- input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
- final TableModify.Operation operation = tableModify.getOperation();
- final List<String> updateColumnList = tableModify.getUpdateColumnList();
- final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
- final boolean flattened = tableModify.isFlattened();
-
- final Table table = tableModify.getTable().unwrap(Table.class);
-
- switch (table.getJdbcTableType()) {
- case TABLE:
- case STREAM:
- if (operation != TableModify.Operation.INSERT) {
- throw new UnsupportedOperationException(
- String.format("Streams doesn't support %s modify operation", operation));
- }
- return new BeamIOSinkRel(cluster, traitSet,
- relOptTable, catalogReader, convertedInput, operation, updateColumnList,
- sourceExpressionList, flattened);
- default:
- throw new IllegalArgumentException(
- String.format("Unsupported table type: %s", table.getJdbcTableType()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 496b977..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
- public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
- private BeamIOSourceRule() {
- super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
- "BeamIOSourceRule");
- }
-
- @Override
- public RelNode convert(RelNode rel) {
- final TableScan scan = (TableScan) rel;
-
- return new BeamIOSourceRel(scan.getCluster(),
- scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
deleted file mode 100644
index 6fdbd9b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-
-/**
- * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
- */
-public class BeamIntersectRule extends ConverterRule {
- public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
- private BeamIntersectRule() {
- super(LogicalIntersect.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Intersect intersect = (Intersect) rel;
- final List<RelNode> inputs = intersect.getInputs();
- return new BeamIntersectRel(
- intersect.getCluster(),
- intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convertList(inputs, BeamLogicalConvention.INSTANCE),
- intersect.all
- );
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
deleted file mode 100644
index 147932e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.logical.LogicalJoin;
-
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
-public class BeamJoinRule extends ConverterRule {
- public static final BeamJoinRule INSTANCE = new BeamJoinRule();
- private BeamJoinRule() {
- super(LogicalJoin.class, Convention.NONE,
- BeamLogicalConvention.INSTANCE, "BeamJoinRule");
- }
-
- @Override public RelNode convert(RelNode rel) {
- Join join = (Join) rel;
- return new BeamJoinRel(
- join.getCluster(),
- join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
- convert(join.getLeft(),
- join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- convert(join.getRight(),
- join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
- join.getCondition(),
- join.getVariablesSet(),
- join.getJoinType()
- );
- }
-}