You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:13 UTC

[50/59] beam git commit: move all implementation classes/packages into impl package

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
deleted file mode 100644
index f1da29f..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamFilterRel.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlFilterFn;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Filter} node.
- *
- */
-public class BeamFilterRel extends Filter implements BeamRelNode {
-
-  public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
-      RexNode condition) {
-    super(cluster, traits, child, condition);
-  }
-
-  @Override
-  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
-    return new BeamFilterRel(getCluster(), traitSet, input, condition);
-  }
-
-  @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
-
-    PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
-    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
-    PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
-        ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return filterStream;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
deleted file mode 100644
index ce941a0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSinkRel.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.base.Joiner;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
-  public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
-      Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
-      List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
-    super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
-        sourceExpressionList, flattened);
-  }
-
-  @Override
-  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
-        getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
-  }
-
-  /**
-   * Note that {@code BeamIOSinkRel} returns the input PCollection,
-   * which is the persisted PCollection.
-   */
-  @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
-
-    PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-    BaseBeamTable targetTable = sqlEnv.findTable(sourceName);
-
-    upstream.apply(stageName, targetTable.buildIOWriter());
-
-    return upstream;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
deleted file mode 100644
index 85f0bc8..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIOSourceRel.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-
-/**
- * BeamRelNode to replace a {@code TableScan} node.
- *
- */
-public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
-  public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
-    super(cluster, traitSet, table);
-  }
-
-  @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<>(sourceName);
-    if (inputPCollections.has(sourceTupleTag)) {
-      //choose PCollection from input PCollectionTuple if exists there.
-      PCollection<BeamSqlRow> sourceStream = inputPCollections
-          .get(new TupleTag<BeamSqlRow>(sourceName));
-      return sourceStream;
-    } else {
-      //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
-      BaseBeamTable sourceTable = sqlEnv.findTable(sourceName);
-      return sourceTable.buildIOReader(inputPCollections.getPipeline())
-          .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
deleted file mode 100644
index ae73a0d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamIntersectRel.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Intersect} node.
- *
- * <p>This is used to combine two SELECT statements, but returns rows only from the
- * first SELECT statement that are identical to a row in the second SELECT statement.
- */
-public class BeamIntersectRel extends Intersect implements BeamRelNode {
-  private BeamSetOperatorRelBase delegate;
-  public BeamIntersectRel(
-      RelOptCluster cluster,
-      RelTraitSet traits,
-      List<RelNode> inputs,
-      boolean all) {
-    super(cluster, traits, inputs, all);
-    delegate = new BeamSetOperatorRelBase(this,
-        BeamSetOperatorRelBase.OpType.INTERSECT, inputs, all);
-  }
-
-  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-    return new BeamIntersectRel(getCluster(), traitSet, inputs, all);
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
deleted file mode 100644
index 3d9c9cd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamJoinRel.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.transform.BeamJoinTransforms;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.CorrelationId;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.core.JoinRelType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.Pair;
-
-/**
- * {@code BeamRelNode} to replace a {@code Join} node.
- *
- * <p>Support for join can be categorized into 3 cases:
- * <ul>
- *   <li>BoundedTable JOIN BoundedTable</li>
- *   <li>UnboundedTable JOIN UnboundedTable</li>
- *   <li>BoundedTable JOIN UnboundedTable</li>
- * </ul>
- *
- * <p>For the first two cases, a standard join is utilized as long as the windowFn of the both
- * sides match.
- *
- * <p>For the third case, {@code sideInput} is utilized to implement the join, so there are some
- * constraints:
- *
- * <ul>
- *   <li>{@code FULL OUTER JOIN} is not supported.</li>
- *   <li>If it's a {@code LEFT OUTER JOIN}, the unbounded table should on the left side.</li>
- *   <li>If it's a {@code RIGHT OUTER JOIN}, the unbounded table should on the right side.</li>
- * </ul>
- *
- *
- * <p>There are also some general constraints:
- *
- * <ul>
- *  <li>Only equi-join is supported.</li>
- *  <li>CROSS JOIN is not supported.</li>
- * </ul>
- */
-public class BeamJoinRel extends Join implements BeamRelNode {
-  public BeamJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
-      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
-    super(cluster, traits, left, right, condition, variablesSet, joinType);
-  }
-
-  @Override public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left,
-      RelNode right, JoinRelType joinType, boolean semiJoinDone) {
-    return new BeamJoinRel(getCluster(), traitSet, left, right, conditionExpr, variablesSet,
-        joinType);
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections,
-      BeamSqlEnv sqlEnv)
-      throws Exception {
-    BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
-    BeamSqlRowType leftRowType = CalciteUtils.toBeamRowType(left.getRowType());
-    PCollection<BeamSqlRow> leftRows = leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
-    final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
-    PCollection<BeamSqlRow> rightRows = rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
-    String stageName = BeamSqlRelUtils.getStageName(this);
-    WindowFn leftWinFn = leftRows.getWindowingStrategy().getWindowFn();
-    WindowFn rightWinFn = rightRows.getWindowingStrategy().getWindowFn();
-
-    // extract the join fields
-    List<Pair<Integer, Integer>> pairs = extractJoinColumns(
-        leftRelNode.getRowType().getFieldCount());
-
-    // build the extract key type
-    // the name of the join field is not important
-    List<String> names = new ArrayList<>(pairs.size());
-    List<Integer> types = new ArrayList<>(pairs.size());
-    for (int i = 0; i < pairs.size(); i++) {
-      names.add("c" + i);
-      types.add(leftRowType.getFieldsType().get(pairs.get(i).getKey()));
-    }
-    BeamSqlRowType extractKeyRowType = BeamSqlRowType.create(names, types);
-
-    Coder extractKeyRowCoder = new BeamSqlRowCoder(extractKeyRowType);
-
-    // BeamSqlRow -> KV<BeamSqlRow, BeamSqlRow>
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows = leftRows
-        .apply(stageName + "_left_ExtractJoinFields",
-            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(true, pairs)))
-        .setCoder(KvCoder.of(extractKeyRowCoder, leftRows.getCoder()));
-
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows = rightRows
-        .apply(stageName + "_right_ExtractJoinFields",
-            MapElements.via(new BeamJoinTransforms.ExtractJoinFields(false, pairs)))
-        .setCoder(KvCoder.of(extractKeyRowCoder, rightRows.getCoder()));
-
-    // prepare the NullRows
-    BeamSqlRow leftNullRow = buildNullRow(leftRelNode);
-    BeamSqlRow rightNullRow = buildNullRow(rightRelNode);
-
-    // a regular join
-    if ((leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-           || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
-                && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)) {
-      try {
-        leftWinFn.verifyCompatibility(rightWinFn);
-      } catch (IncompatibleWindowException e) {
-        throw new IllegalArgumentException(
-            "WindowFns must match for a bounded-vs-bounded/unbounded-vs-unbounded join.", e);
-      }
-
-      return standardJoin(extractedLeftRows, extractedRightRows,
-          leftNullRow, rightNullRow, stageName);
-    } else if (
-        (leftRows.isBounded() == PCollection.IsBounded.BOUNDED
-        && rightRows.isBounded() == PCollection.IsBounded.UNBOUNDED)
-        || (leftRows.isBounded() == PCollection.IsBounded.UNBOUNDED
-            && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)
-        ) {
-      // if one of the sides is Bounded & the other is Unbounded
-      // then do a sideInput join
-      // when doing a sideInput join, the windowFn does not need to match
-      // Only support INNER JOIN & LEFT OUTER JOIN where left side of the join must be
-      // the unbounded
-      if (joinType == JoinRelType.FULL) {
-        throw new UnsupportedOperationException("FULL OUTER JOIN is not supported when join "
-            + "a bounded table with an unbounded table.");
-      }
-
-      if ((joinType == JoinRelType.LEFT
-          && leftRows.isBounded() == PCollection.IsBounded.BOUNDED)
-          || (joinType == JoinRelType.RIGHT
-          && rightRows.isBounded() == PCollection.IsBounded.BOUNDED)) {
-        throw new UnsupportedOperationException(
-            "LEFT side of an OUTER JOIN must be Unbounded table.");
-      }
-
-      return sideInputJoin(extractedLeftRows, extractedRightRows,
-          leftNullRow, rightNullRow);
-    } else {
-      throw new UnsupportedOperationException(
-          "The inputs to the JOIN have un-joinnable windowFns: " + leftWinFn + ", " + rightWinFn);
-    }
-  }
-
-  private PCollection<BeamSqlRow> standardJoin(
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
-      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow, String stageName) {
-    PCollection<KV<BeamSqlRow, KV<BeamSqlRow, BeamSqlRow>>> joinedRows = null;
-    switch (joinType) {
-      case LEFT:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .leftOuterJoin(extractedLeftRows, extractedRightRows, rightNullRow);
-        break;
-      case RIGHT:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .rightOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow);
-        break;
-      case FULL:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .fullOuterJoin(extractedLeftRows, extractedRightRows, leftNullRow,
-            rightNullRow);
-        break;
-      case INNER:
-      default:
-        joinedRows = org.apache.beam.sdk.extensions.joinlibrary.Join
-            .innerJoin(extractedLeftRows, extractedRightRows);
-        break;
-    }
-
-    PCollection<BeamSqlRow> ret = joinedRows
-        .apply(stageName + "_JoinParts2WholeRow",
-            MapElements.via(new BeamJoinTransforms.JoinParts2WholeRow()))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-    return ret;
-  }
-
-  public PCollection<BeamSqlRow> sideInputJoin(
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedLeftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> extractedRightRows,
-      BeamSqlRow leftNullRow, BeamSqlRow rightNullRow) {
-    // we always make the Unbounded table on the left to do the sideInput join
-    // (will convert the result accordingly before return)
-    boolean swapped = (extractedLeftRows.isBounded() == PCollection.IsBounded.BOUNDED);
-    JoinRelType realJoinType =
-        (swapped && joinType != JoinRelType.INNER) ? JoinRelType.LEFT : joinType;
-
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> realLeftRows =
-        swapped ? extractedRightRows : extractedLeftRows;
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> realRightRows =
-        swapped ? extractedLeftRows : extractedRightRows;
-    BeamSqlRow realRightNullRow = swapped ? leftNullRow : rightNullRow;
-
-    // swapped still need to pass down because, we need to swap the result back.
-    return sideInputJoinHelper(realJoinType, realLeftRows, realRightRows,
-        realRightNullRow, swapped);
-  }
-
-  private PCollection<BeamSqlRow> sideInputJoinHelper(
-      JoinRelType joinType,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> leftRows,
-      PCollection<KV<BeamSqlRow, BeamSqlRow>> rightRows,
-      BeamSqlRow rightNullRow, boolean swapped) {
-    final PCollectionView<Map<BeamSqlRow, Iterable<BeamSqlRow>>> rowsView = rightRows
-        .apply(View.<BeamSqlRow, BeamSqlRow>asMultimap());
-
-    PCollection<BeamSqlRow> ret = leftRows
-        .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
-            joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
-        .setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return ret;
-  }
-
-  private BeamSqlRow buildNullRow(BeamRelNode relNode) {
-    BeamSqlRowType leftType = CalciteUtils.toBeamRowType(relNode.getRowType());
-    BeamSqlRow nullRow = new BeamSqlRow(leftType);
-    for (int i = 0; i < leftType.size(); i++) {
-      nullRow.addField(i, null);
-    }
-    return nullRow;
-  }
-
-  private List<Pair<Integer, Integer>> extractJoinColumns(int leftRowColumnCount) {
-    // it's a CROSS JOIN because: condition == true
-    if (condition instanceof RexLiteral && (Boolean) ((RexLiteral) condition).getValue()) {
-      throw new UnsupportedOperationException("CROSS JOIN is not supported!");
-    }
-
-    RexCall call = (RexCall) condition;
-    List<Pair<Integer, Integer>> pairs = new ArrayList<>();
-    if ("AND".equals(call.getOperator().getName())) {
-      List<RexNode> operands = call.getOperands();
-      for (RexNode rexNode : operands) {
-        Pair<Integer, Integer> pair = extractOneJoinColumn((RexCall) rexNode, leftRowColumnCount);
-        pairs.add(pair);
-      }
-    } else if ("=".equals(call.getOperator().getName())) {
-      pairs.add(extractOneJoinColumn(call, leftRowColumnCount));
-    } else {
-      throw new UnsupportedOperationException(
-          "Operator " + call.getOperator().getName() + " is not supported in join condition");
-    }
-
-    return pairs;
-  }
-
-  private Pair<Integer, Integer> extractOneJoinColumn(RexCall oneCondition,
-      int leftRowColumnCount) {
-    List<RexNode> operands = oneCondition.getOperands();
-    final int leftIndex = Math.min(((RexInputRef) operands.get(0)).getIndex(),
-        ((RexInputRef) operands.get(1)).getIndex());
-
-    final int rightIndex1 = Math.max(((RexInputRef) operands.get(0)).getIndex(),
-        ((RexInputRef) operands.get(1)).getIndex());
-    final int rightIndex = rightIndex1 - leftRowColumnCount;
-
-    return new Pair<>(leftIndex, rightIndex);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
deleted file mode 100644
index 58b90ca..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamLogicalConvention.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-/**
- * Convertion for Beam SQL.
- *
- */
-public enum BeamLogicalConvention implements Convention {
-  INSTANCE;
-
-  @Override
-  public Class getInterface() {
-    return BeamRelNode.class;
-  }
-
-  @Override
-  public String getName() {
-    return "BEAM_LOGICAL";
-  }
-
-  @Override
-  public RelTraitDef getTraitDef() {
-    return ConventionTraitDef.INSTANCE;
-  }
-
-  @Override
-  public boolean satisfies(RelTrait trait) {
-    return this == trait;
-  }
-
-  @Override
-  public void register(RelOptPlanner planner) {
-  }
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  @Override
-  public boolean canConvertConvention(Convention toConvention) {
-    return false;
-  }
-
-  @Override
-  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
deleted file mode 100644
index 8cef971..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamMinusRel.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.core.SetOp;
-
-/**
- * {@code BeamRelNode} to replace a {@code Minus} node.
- *
- * <p>Corresponds to the SQL {@code EXCEPT} operator.
- */
-public class BeamMinusRel extends Minus implements BeamRelNode {
-
-  private BeamSetOperatorRelBase delegate;
-
-  public BeamMinusRel(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs,
-      boolean all) {
-    super(cluster, traits, inputs, all);
-    delegate = new BeamSetOperatorRelBase(this,
-        BeamSetOperatorRelBase.OpType.MINUS, inputs, all);
-  }
-
-  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-    return new BeamMinusRel(getCluster(), traitSet, inputs, all);
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
deleted file mode 100644
index 8f81038..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamProjectRel.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlExpressionExecutor;
-import org.apache.beam.sdk.extensions.sql.interpreter.BeamSqlFnExecutor;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSqlProjectFn;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-
-/**
- * BeamRelNode to replace a {@code Project} node.
- *
- */
-public class BeamProjectRel extends Project implements BeamRelNode {
-
-  /**
-   * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
-   *
-   */
-  public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-      List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traits, input, projects, rowType);
-  }
-
-  @Override
-  public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
-      RelDataType rowType) {
-    return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
-  }
-
-  @Override
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    String stageName = BeamSqlRelUtils.getStageName(this);
-
-    PCollection<BeamSqlRow> upstream =
-        BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections, sqlEnv);
-
-    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
-
-    PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
-        .of(new BeamSqlProjectFn(getRelTypeName(), executor,
-            CalciteUtils.toBeamRowType(rowType))));
-    projectStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return projectStream;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
deleted file mode 100644
index 80a4b84..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * A new method {@link #buildBeamPipeline(PCollectionTuple, BeamSqlEnv)} is added.
- */
-public interface BeamRelNode extends RelNode {
-
-  /**
-   * A {@link BeamRelNode} is a recursive structure, the
-   * {@code BeamQueryPlanner} visits it with a DFS(Depth-First-Search)
-   * algorithm.
-   */
-  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv)
-      throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
deleted file mode 100644
index 7f80eb0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSetOperatorRelBase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.transform.BeamSetOperatorsTransforms;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.join.CoGbkResult;
-import org.apache.beam.sdk.transforms.join.CoGroupByKey;
-import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.calcite.rel.RelNode;
-
-/**
- * Delegate for Set operators: {@code BeamUnionRel}, {@code BeamIntersectRel}
- * and {@code BeamMinusRel}.
- */
-public class BeamSetOperatorRelBase {
-  /**
-   * Set operator type.
-   */
-  public enum OpType implements Serializable {
-    UNION,
-    INTERSECT,
-    MINUS
-  }
-
-  private BeamRelNode beamRelNode;
-  private List<RelNode> inputs;
-  private boolean all;
-  private OpType opType;
-
-  public BeamSetOperatorRelBase(BeamRelNode beamRelNode, OpType opType,
-      List<RelNode> inputs, boolean all) {
-    this.beamRelNode = beamRelNode;
-    this.opType = opType;
-    this.inputs = inputs;
-    this.all = all;
-  }
-
-  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    PCollection<BeamSqlRow> leftRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(0))
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-    PCollection<BeamSqlRow> rightRows = BeamSqlRelUtils.getBeamRelInput(inputs.get(1))
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-
-    WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
-    WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
-    if (!leftWindow.isCompatible(rightWindow)) {
-      throw new IllegalArgumentException(
-          "inputs of " + opType + " have different window strategy: "
-          + leftWindow + " VS " + rightWindow);
-    }
-
-    final TupleTag<BeamSqlRow> leftTag = new TupleTag<>();
-    final TupleTag<BeamSqlRow> rightTag = new TupleTag<>();
-
-    // co-group
-    String stageName = BeamSqlRelUtils.getStageName(beamRelNode);
-    PCollection<KV<BeamSqlRow, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple
-        .of(leftTag, leftRows.apply(
-            stageName + "_CreateLeftIndex", MapElements.via(
-                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
-        .and(rightTag, rightRows.apply(
-            stageName + "_CreateRightIndex", MapElements.via(
-                new BeamSetOperatorsTransforms.BeamSqlRow2KvFn())))
-        .apply(CoGroupByKey.<BeamSqlRow>create());
-    PCollection<BeamSqlRow> ret = coGbkResultCollection
-        .apply(ParDo.of(new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(leftTag, rightTag,
-            opType, all)));
-    return ret;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
deleted file mode 100644
index 363c0a9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSortRel.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollationImpl;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamRelNode} to replace a {@code Sort} node.
- *
- * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
- * the {@code Sort} algebra. The following types of ORDER BY are supported:
-
- * <pre>{@code
- *     select * from t order by id desc limit 10;
- *     select * from t order by id desc limit 10, 5;
- * }</pre>
- *
- * <p>but Order BY without a limit is NOT supported:
- *
- * <pre>{@code
- *   select * from t order by id desc
- * }</pre>
- *
- * <h3>Constraints</h3>
- * <ul>
- *   <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
- *   must fit into the memory of a single machine.</li>
- *   <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
- *   it does not make much sense to use `ORDER BY` with `WINDOW`.
- *   </li>
- * </ul>
- */
-public class BeamSortRel extends Sort implements BeamRelNode {
-  private List<Integer> fieldIndices = new ArrayList<>();
-  private List<Boolean> orientation = new ArrayList<>();
-  private List<Boolean> nullsFirst = new ArrayList<>();
-
-  private int startIndex = 0;
-  private int count;
-
-  public BeamSortRel(
-      RelOptCluster cluster,
-      RelTraitSet traits,
-      RelNode child,
-      RelCollation collation,
-      RexNode offset,
-      RexNode fetch) {
-    super(cluster, traits, child, collation, offset, fetch);
-
-    List<RexNode> fieldExps = getChildExps();
-    RelCollationImpl collationImpl = (RelCollationImpl) collation;
-    List<RelFieldCollation> collations = collationImpl.getFieldCollations();
-    for (int i = 0; i < fieldExps.size(); i++) {
-      RexNode fieldExp = fieldExps.get(i);
-      RexInputRef inputRef = (RexInputRef) fieldExp;
-      fieldIndices.add(inputRef.getIndex());
-      orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
-
-      RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
-      if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
-        rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
-      }
-      nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
-    }
-
-    if (fetch == null) {
-      throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
-    }
-
-    RexLiteral fetchLiteral = (RexLiteral) fetch;
-    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
-
-    if (offset != null) {
-      RexLiteral offsetLiteral = (RexLiteral) offset;
-      startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
-    }
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-    Type windowType = upstream.getWindowingStrategy().getWindowFn()
-        .getWindowTypeDescriptor().getType();
-    if (!windowType.equals(GlobalWindow.class)) {
-      throw new UnsupportedOperationException(
-          "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
-    }
-
-    BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
-        nullsFirst);
-    // first find the top (offset + count)
-    PCollection<List<BeamSqlRow>> rawStream =
-        upstream.apply("extractTopOffsetAndFetch",
-            Top.of(startIndex + count, comparator).withoutDefaults())
-        .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-
-    // strip the `leading offset`
-    if (startIndex > 0) {
-      rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
-          new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
-          .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-    }
-
-    PCollection<BeamSqlRow> orderedStream = rawStream.apply(
-        "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return orderedStream;
-  }
-
-  private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
-    private int startIndex;
-    private int endIndex;
-
-    public SubListFn(int startIndex, int endIndex) {
-      this.startIndex = startIndex;
-      this.endIndex = endIndex;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().subList(startIndex, endIndex));
-    }
-  }
-
-  @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
-      RexNode offset, RexNode fetch) {
-    return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
-  }
-
-  private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
-    private List<Integer> fieldsIndices;
-    private List<Boolean> orientation;
-    private List<Boolean> nullsFirst;
-
-    public BeamSqlRowComparator(List<Integer> fieldsIndices,
-        List<Boolean> orientation,
-        List<Boolean> nullsFirst) {
-      this.fieldsIndices = fieldsIndices;
-      this.orientation = orientation;
-      this.nullsFirst = nullsFirst;
-    }
-
-    @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
-      for (int i = 0; i < fieldsIndices.size(); i++) {
-        int fieldIndex = fieldsIndices.get(i);
-        int fieldRet = 0;
-        SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
-        // whether NULL should be ordered first or last(compared to non-null values) depends on
-        // what user specified in SQL(NULLS FIRST/NULLS LAST)
-        if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
-          continue;
-        } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
-          fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
-        } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
-          fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
-        } else {
-          switch (fieldType) {
-            case TINYINT:
-              fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
-              break;
-            case SMALLINT:
-              fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
-              break;
-            case INTEGER:
-              fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
-              break;
-            case BIGINT:
-              fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
-              break;
-            case FLOAT:
-              fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
-              break;
-            case DOUBLE:
-              fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
-              break;
-            case VARCHAR:
-              fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
-              break;
-            case DATE:
-              fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
-              break;
-            default:
-              throw new UnsupportedOperationException(
-                  "Data type: " + fieldType + " not supported yet!");
-          }
-        }
-
-        fieldRet *= (orientation.get(i) ? -1 : 1);
-        if (fieldRet != 0) {
-          return fieldRet;
-        }
-      }
-      return 0;
-    }
-  }
-
-  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
-    return a.compareTo(b);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
deleted file mode 100644
index cc503d0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamSqlRelUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-class BeamSqlRelUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
-
-  private static final AtomicInteger sequence = new AtomicInteger(0);
-  private static final AtomicInteger classSequence = new AtomicInteger(0);
-
-  public static String getStageName(BeamRelNode relNode) {
-    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
-        + sequence.getAndIncrement();
-  }
-
-  public static String getClassName(BeamRelNode relNode) {
-    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
-        + "_" + classSequence.getAndIncrement();
-  }
-
-  public static BeamRelNode getBeamRelInput(RelNode input) {
-    if (input instanceof RelSubset) {
-      // go with known best input
-      input = ((RelSubset) input).getBest();
-    }
-    return (BeamRelNode) input;
-  }
-
-  public static String explain(final RelNode rel) {
-    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
-  }
-
-  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
-    String explain = "";
-    try {
-      explain = RelOptUtil.toString(rel);
-    } catch (StackOverflowError e) {
-      LOG.error("StackOverflowError occurred while extracting plan. "
-          + "Please report it to the dev@ mailing list.");
-      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
-      LOG.error("Forcing plan to empty string and continue... "
-          + "SQL Runner may not working properly after.");
-    }
-    return explain;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
deleted file mode 100644
index 695521d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamUnionRel.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.core.Union;
-
-/**
- * {@link BeamRelNode} to replace a {@link Union}.
- *
- * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
- * perspective, two cases are supported:
- *
- * <p>1) Do not use {@code grouped window function}:
- *
- * <pre>{@code
- *   select * from person UNION select * from person
- * }</pre>
- *
- * <p>2) Use the same {@code grouped window function}, with the same param:
- * <pre>{@code
- *   select id, count(*) from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- *   UNION
- *   select * from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * }</pre>
- *
- * <p>Inputs with different group functions are NOT supported:
- * <pre>{@code
- *   select id, count(*) from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- *   UNION
- *   select * from person
- *   group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
- * }</pre>
- */
-public class BeamUnionRel extends Union implements BeamRelNode {
-  private BeamSetOperatorRelBase delegate;
-  public BeamUnionRel(RelOptCluster cluster,
-      RelTraitSet traits,
-      List<RelNode> inputs,
-      boolean all) {
-    super(cluster, traits, inputs, all);
-    this.delegate = new BeamSetOperatorRelBase(this,
-        BeamSetOperatorRelBase.OpType.UNION,
-        inputs, all);
-  }
-
-  public BeamUnionRel(RelInput input) {
-    super(input);
-  }
-
-  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-    return new BeamUnionRel(getCluster(), traitSet, inputs, all);
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
deleted file mode 100644
index f3bf3a3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/BeamValuesRel.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rel;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.BeamSqlEnv;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
-import org.apache.beam.sdk.extensions.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLiteral;
-
-/**
- * {@code BeamRelNode} to replace a {@code Values} node.
- *
- * <p>{@code BeamValuesRel} will be used in the following SQLs:
- * <ul>
- *   <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
- *   <li>{@code select 1, '1', LOCALTIME}</li>
- * </ul>
- */
-public class BeamValuesRel extends Values implements BeamRelNode {
-
-  public BeamValuesRel(
-      RelOptCluster cluster,
-      RelDataType rowType,
-      ImmutableList<ImmutableList<RexLiteral>> tuples,
-      RelTraitSet traits) {
-    super(cluster, rowType, tuples, traits);
-
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
-    String stageName = BeamSqlRelUtils.getStageName(this);
-    if (tuples.isEmpty()) {
-      throw new IllegalStateException("Values with empty tuples!");
-    }
-
-    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
-    for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
-      for (int i = 0; i < tuple.size(); i++) {
-        BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
-      }
-      rows.add(row);
-    }
-
-    return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
-        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
deleted file mode 100644
index fb0a8e2..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.apache.beam.sdk.extensions.sql.rel;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
deleted file mode 100644
index 17e3f80..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamAggregationRule.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamAggregationRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Duration;
-
-/**
- * Rule to detect the window/trigger settings.
- *
- */
-public class BeamAggregationRule extends RelOptRule {
-  public static final BeamAggregationRule INSTANCE =
-      new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
-
-  public BeamAggregationRule(
-      Class<? extends Aggregate> aggregateClass,
-      Class<? extends Project> projectClass,
-      RelBuilderFactory relBuilderFactory) {
-    super(
-        operand(aggregateClass,
-            operand(projectClass, any())),
-        relBuilderFactory, null);
-  }
-
-  public BeamAggregationRule(RelOptRuleOperand operand, String description) {
-    super(operand, description);
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final Aggregate aggregate = call.rel(0);
-    final Project project = call.rel(1);
-    updateWindowTrigger(call, aggregate, project);
-  }
-
-  private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
-      Project project) {
-    ImmutableBitSet groupByFields = aggregate.getGroupSet();
-    List<RexNode> projectMapping = project.getProjects();
-
-    WindowFn windowFn = new GlobalWindows();
-    Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
-    int windowFieldIdx = -1;
-    Duration allowedLatence = Duration.ZERO;
-
-    for (int groupField : groupByFields.asList()) {
-      RexNode projNode = projectMapping.get(groupField);
-      if (projNode instanceof RexCall) {
-        SqlOperator op = ((RexCall) projNode).op;
-        ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
-        String functionName = op.getName();
-        switch (functionName) {
-        case "TUMBLE":
-          windowFieldIdx = groupField;
-          windowFn = FixedWindows
-              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
-          if (parameters.size() == 3) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        case "HOP":
-          windowFieldIdx = groupField;
-          windowFn = SlidingWindows
-              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
-              .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
-          if (parameters.size() == 4) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        case "SESSION":
-          windowFieldIdx = groupField;
-          windowFn = Sessions
-              .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
-          if (parameters.size() == 3) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        default:
-          break;
-        }
-      }
-    }
-
-    BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
-        aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(aggregate.getInput(),
-            aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        aggregate.indicator,
-        aggregate.getGroupSet(),
-        aggregate.getGroupSets(),
-        aggregate.getAggCallList(),
-        windowFn,
-        triggerFn,
-        windowFieldIdx,
-        allowedLatence);
-    call.transformTo(newAggregator);
-  }
-
-  private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
-    return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
-        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
-  }
-
-  private long getWindowParameterAsMillis(RexNode parameterNode) {
-    if (parameterNode instanceof RexLiteral) {
-      return RexLiteral.intValue(parameterNode);
-    } else {
-      throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
deleted file mode 100644
index b30a9d9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
-  public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
-  private BeamFilterRule() {
-    super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Filter filter = (Filter) rel;
-    final RelNode input = filter.getInput();
-
-    return new BeamFilterRel(filter.getCluster(),
-        filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        filter.getCondition());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index 54079b0..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamIOSinkRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
-  public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
-  private BeamIOSinkRule() {
-    super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSinkRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableModify tableModify = (TableModify) rel;
-    final RelNode input = tableModify.getInput();
-
-    final RelOptCluster cluster = tableModify.getCluster();
-    final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
-    final RelOptTable relOptTable = tableModify.getTable();
-    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-    final RelNode convertedInput = convert(input,
-        input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
-    final TableModify.Operation operation = tableModify.getOperation();
-    final List<String> updateColumnList = tableModify.getUpdateColumnList();
-    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
-    final boolean flattened = tableModify.isFlattened();
-
-    final Table table = tableModify.getTable().unwrap(Table.class);
-
-    switch (table.getJdbcTableType()) {
-    case TABLE:
-    case STREAM:
-      if (operation != TableModify.Operation.INSERT) {
-        throw new UnsupportedOperationException(
-            String.format("Streams doesn't support %s modify operation", operation));
-      }
-      return new BeamIOSinkRel(cluster, traitSet,
-          relOptTable, catalogReader, convertedInput, operation, updateColumnList,
-          sourceExpressionList, flattened);
-    default:
-      throw new IllegalArgumentException(
-          String.format("Unsupported table type: %s", table.getJdbcTableType()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 496b977..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamIOSourceRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
-  public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
-  private BeamIOSourceRule() {
-    super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSourceRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableScan scan = (TableScan) rel;
-
-    return new BeamIOSourceRel(scan.getCluster(),
-        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
deleted file mode 100644
index 6fdbd9b..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamIntersectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import java.util.List;
-import org.apache.beam.sdk.extensions.sql.rel.BeamIntersectRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-
-/**
- * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
- */
-public class BeamIntersectRule extends ConverterRule {
-  public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
-  private BeamIntersectRule() {
-    super(LogicalIntersect.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Intersect intersect = (Intersect) rel;
-    final List<RelNode> inputs = intersect.getInputs();
-    return new BeamIntersectRel(
-        intersect.getCluster(),
-        intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(inputs, BeamLogicalConvention.INSTANCE),
-        intersect.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
deleted file mode 100644
index 147932e..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/rule/BeamJoinRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.rule;
-
-import org.apache.beam.sdk.extensions.sql.rel.BeamJoinRel;
-import org.apache.beam.sdk.extensions.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.logical.LogicalJoin;
-
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
-public class BeamJoinRule extends ConverterRule {
-  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
-  private BeamJoinRule() {
-    super(LogicalJoin.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamJoinRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Join join = (Join) rel;
-    return new BeamJoinRel(
-        join.getCluster(),
-        join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(join.getLeft(),
-            join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        convert(join.getRight(),
-            join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        join.getCondition(),
-        join.getVariablesSet(),
-        join.getJoinType()
-    );
-  }
-}