You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:01 UTC

[38/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
deleted file mode 100644
index ba344df..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.io.Serializable;
-import java.lang.reflect.Type;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Top;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelCollation;
-import org.apache.calcite.rel.RelCollationImpl;
-import org.apache.calcite.rel.RelFieldCollation;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * {@code BeamRelNode} to replace a {@code Sort} node.
- *
- * <p>Since Beam does not fully supported global sort we are using {@link Top} to implement
- * the {@code Sort} algebra. The following types of ORDER BY are supported:
-
- * <pre>{@code
- *     select * from t order by id desc limit 10;
- *     select * from t order by id desc limit 10, 5;
- * }</pre>
- *
- * <p>but Order BY without a limit is NOT supported:
- *
- * <pre>{@code
- *   select * from t order by id desc
- * }</pre>
- *
- * <h3>Constraints</h3>
- * <ul>
- *   <li>Due to the constraints of {@link Top}, the result of a `ORDER BY LIMIT`
- *   must fit into the memory of a single machine.</li>
- *   <li>Since `WINDOW`(HOP, TUMBLE, SESSION etc) is always associated with `GroupBy`,
- *   it does not make much sense to use `ORDER BY` with `WINDOW`.
- *   </li>
- * </ul>
- */
-public class BeamSortRel extends Sort implements BeamRelNode {
-  private List<Integer> fieldIndices = new ArrayList<>();
-  private List<Boolean> orientation = new ArrayList<>();
-  private List<Boolean> nullsFirst = new ArrayList<>();
-
-  private int startIndex = 0;
-  private int count;
-
-  public BeamSortRel(
-      RelOptCluster cluster,
-      RelTraitSet traits,
-      RelNode child,
-      RelCollation collation,
-      RexNode offset,
-      RexNode fetch) {
-    super(cluster, traits, child, collation, offset, fetch);
-
-    List<RexNode> fieldExps = getChildExps();
-    RelCollationImpl collationImpl = (RelCollationImpl) collation;
-    List<RelFieldCollation> collations = collationImpl.getFieldCollations();
-    for (int i = 0; i < fieldExps.size(); i++) {
-      RexNode fieldExp = fieldExps.get(i);
-      RexInputRef inputRef = (RexInputRef) fieldExp;
-      fieldIndices.add(inputRef.getIndex());
-      orientation.add(collations.get(i).getDirection() == RelFieldCollation.Direction.ASCENDING);
-
-      RelFieldCollation.NullDirection rawNullDirection = collations.get(i).nullDirection;
-      if (rawNullDirection == RelFieldCollation.NullDirection.UNSPECIFIED) {
-        rawNullDirection = collations.get(i).getDirection().defaultNullDirection();
-      }
-      nullsFirst.add(rawNullDirection == RelFieldCollation.NullDirection.FIRST);
-    }
-
-    if (fetch == null) {
-      throw new UnsupportedOperationException("ORDER BY without a LIMIT is not supported!");
-    }
-
-    RexLiteral fetchLiteral = (RexLiteral) fetch;
-    count = ((BigDecimal) fetchLiteral.getValue()).intValue();
-
-    if (offset != null) {
-      RexLiteral offsetLiteral = (RexLiteral) offset;
-      startIndex = ((BigDecimal) offsetLiteral.getValue()).intValue();
-    }
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    RelNode input = getInput();
-    PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(inputPCollections, sqlEnv);
-    Type windowType = upstream.getWindowingStrategy().getWindowFn()
-        .getWindowTypeDescriptor().getType();
-    if (!windowType.equals(GlobalWindow.class)) {
-      throw new UnsupportedOperationException(
-          "`ORDER BY` is only supported for GlobalWindow, actual window: " + windowType);
-    }
-
-    BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, orientation,
-        nullsFirst);
-    // first find the top (offset + count)
-    PCollection<List<BeamSqlRow>> rawStream =
-        upstream.apply("extractTopOffsetAndFetch",
-            Top.of(startIndex + count, comparator).withoutDefaults())
-        .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-
-    // strip the `leading offset`
-    if (startIndex > 0) {
-      rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
-          new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
-          .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
-    }
-
-    PCollection<BeamSqlRow> orderedStream = rawStream.apply(
-        "flatten", Flatten.<BeamSqlRow>iterables());
-    orderedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRowType(getRowType())));
-
-    return orderedStream;
-  }
-
-  private static class SubListFn<T> extends DoFn<List<T>, List<T>> {
-    private int startIndex;
-    private int endIndex;
-
-    public SubListFn(int startIndex, int endIndex) {
-      this.startIndex = startIndex;
-      this.endIndex = endIndex;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext ctx) {
-      ctx.output(ctx.element().subList(startIndex, endIndex));
-    }
-  }
-
-  @Override public Sort copy(RelTraitSet traitSet, RelNode newInput, RelCollation newCollation,
-      RexNode offset, RexNode fetch) {
-    return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, offset, fetch);
-  }
-
-  private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, Serializable {
-    private List<Integer> fieldsIndices;
-    private List<Boolean> orientation;
-    private List<Boolean> nullsFirst;
-
-    public BeamSqlRowComparator(List<Integer> fieldsIndices,
-        List<Boolean> orientation,
-        List<Boolean> nullsFirst) {
-      this.fieldsIndices = fieldsIndices;
-      this.orientation = orientation;
-      this.nullsFirst = nullsFirst;
-    }
-
-    @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
-      for (int i = 0; i < fieldsIndices.size(); i++) {
-        int fieldIndex = fieldsIndices.get(i);
-        int fieldRet = 0;
-        SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex);
-        // whether NULL should be ordered first or last(compared to non-null values) depends on
-        // what user specified in SQL(NULLS FIRST/NULLS LAST)
-        if (row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
-          continue;
-        } else if (row1.isNull(fieldIndex) && !row2.isNull(fieldIndex)) {
-          fieldRet = -1 * (nullsFirst.get(i) ? -1 : 1);
-        } else if (!row1.isNull(fieldIndex) && row2.isNull(fieldIndex)) {
-          fieldRet = 1 * (nullsFirst.get(i) ? -1 : 1);
-        } else {
-          switch (fieldType) {
-            case TINYINT:
-              fieldRet = numberCompare(row1.getByte(fieldIndex), row2.getByte(fieldIndex));
-              break;
-            case SMALLINT:
-              fieldRet = numberCompare(row1.getShort(fieldIndex), row2.getShort(fieldIndex));
-              break;
-            case INTEGER:
-              fieldRet = numberCompare(row1.getInteger(fieldIndex), row2.getInteger(fieldIndex));
-              break;
-            case BIGINT:
-              fieldRet = numberCompare(row1.getLong(fieldIndex), row2.getLong(fieldIndex));
-              break;
-            case FLOAT:
-              fieldRet = numberCompare(row1.getFloat(fieldIndex), row2.getFloat(fieldIndex));
-              break;
-            case DOUBLE:
-              fieldRet = numberCompare(row1.getDouble(fieldIndex), row2.getDouble(fieldIndex));
-              break;
-            case VARCHAR:
-              fieldRet = row1.getString(fieldIndex).compareTo(row2.getString(fieldIndex));
-              break;
-            case DATE:
-              fieldRet = row1.getDate(fieldIndex).compareTo(row2.getDate(fieldIndex));
-              break;
-            default:
-              throw new UnsupportedOperationException(
-                  "Data type: " + fieldType + " not supported yet!");
-          }
-        }
-
-        fieldRet *= (orientation.get(i) ? -1 : 1);
-        if (fieldRet != 0) {
-          return fieldRet;
-        }
-      }
-      return 0;
-    }
-  }
-
-  public static <T extends Number & Comparable> int numberCompare(T a, T b) {
-    return a.compareTo(b);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
deleted file mode 100644
index 9f1f703..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSqlRelUtils.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-class BeamSqlRelUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
-
-  private static final AtomicInteger sequence = new AtomicInteger(0);
-  private static final AtomicInteger classSequence = new AtomicInteger(0);
-
-  public static String getStageName(BeamRelNode relNode) {
-    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
-        + sequence.getAndIncrement();
-  }
-
-  public static String getClassName(BeamRelNode relNode) {
-    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
-        + "_" + classSequence.getAndIncrement();
-  }
-
-  public static BeamRelNode getBeamRelInput(RelNode input) {
-    if (input instanceof RelSubset) {
-      // go with known best input
-      input = ((RelSubset) input).getBest();
-    }
-    return (BeamRelNode) input;
-  }
-
-  public static String explain(final RelNode rel) {
-    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
-  }
-
-  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
-    String explain = "";
-    try {
-      explain = RelOptUtil.toString(rel);
-    } catch (StackOverflowError e) {
-      LOG.error("StackOverflowError occurred while extracting plan. "
-          + "Please report it to the dev@ mailing list.");
-      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
-      LOG.error("Forcing plan to empty string and continue... "
-          + "SQL Runner may not working properly after.");
-    }
-    return explain;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
deleted file mode 100644
index c661585..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamUnionRel.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.SetOp;
-import org.apache.calcite.rel.core.Union;
-
-/**
- * {@link BeamRelNode} to replace a {@link Union}.
- *
- * <p>{@code BeamUnionRel} needs the input of it have the same {@link WindowFn}. From the SQL
- * perspective, two cases are supported:
- *
- * <p>1) Do not use {@code grouped window function}:
- *
- * <pre>{@code
- *   select * from person UNION select * from person
- * }</pre>
- *
- * <p>2) Use the same {@code grouped window function}, with the same param:
- * <pre>{@code
- *   select id, count(*) from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- *   UNION
- *   select * from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- * }</pre>
- *
- * <p>Inputs with different group functions are NOT supported:
- * <pre>{@code
- *   select id, count(*) from person
- *   group by id, TUMBLE(order_time, INTERVAL '1' HOUR)
- *   UNION
- *   select * from person
- *   group by id, TUMBLE(order_time, INTERVAL '2' HOUR)
- * }</pre>
- */
-public class BeamUnionRel extends Union implements BeamRelNode {
-  private BeamSetOperatorRelBase delegate;
-  public BeamUnionRel(RelOptCluster cluster,
-      RelTraitSet traits,
-      List<RelNode> inputs,
-      boolean all) {
-    super(cluster, traits, inputs, all);
-    this.delegate = new BeamSetOperatorRelBase(this,
-        BeamSetOperatorRelBase.OpType.UNION,
-        inputs, all);
-  }
-
-  public BeamUnionRel(RelInput input) {
-    super(input);
-  }
-
-  @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
-    return new BeamUnionRel(getCluster(), traitSet, inputs, all);
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    return delegate.buildBeamPipeline(inputPCollections, sqlEnv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
deleted file mode 100644
index 43b74c3..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rel;
-
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.schema.BeamSqlRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowType;
-import org.apache.beam.dsls.sql.schema.BeamTableUtils;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexLiteral;
-
-/**
- * {@code BeamRelNode} to replace a {@code Values} node.
- *
- * <p>{@code BeamValuesRel} will be used in the following SQLs:
- * <ul>
- *   <li>{@code insert into t (name, desc) values ('hello', 'world')}</li>
- *   <li>{@code select 1, '1', LOCALTIME}</li>
- * </ul>
- */
-public class BeamValuesRel extends Values implements BeamRelNode {
-
-  public BeamValuesRel(
-      RelOptCluster cluster,
-      RelDataType rowType,
-      ImmutableList<ImmutableList<RexLiteral>> tuples,
-      RelTraitSet traits) {
-    super(cluster, rowType, tuples, traits);
-
-  }
-
-  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollections
-      , BeamSqlEnv sqlEnv) throws Exception {
-    List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
-    String stageName = BeamSqlRelUtils.getStageName(this);
-    if (tuples.isEmpty()) {
-      throw new IllegalStateException("Values with empty tuples!");
-    }
-
-    BeamSqlRowType beamSQLRowType = CalciteUtils.toBeamRowType(this.getRowType());
-    for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSqlRow row = new BeamSqlRow(beamSQLRowType);
-      for (int i = 0; i < tuple.size(); i++) {
-        BeamTableUtils.addFieldWithAutoTypeCasting(row, i, tuple.get(i).getValue());
-      }
-      rows.add(row);
-    }
-
-    return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
-        .setCoder(new BeamSqlRowCoder(beamSQLRowType));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
deleted file mode 100644
index 77d6204..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.apache.beam.dsls.sql.rel;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
deleted file mode 100644
index 6e843d4..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamAggregationRule.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import com.google.common.collect.ImmutableList;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.dsls.sql.rel.BeamAggregationRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.core.RelFactories;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.tools.RelBuilderFactory;
-import org.apache.calcite.util.ImmutableBitSet;
-import org.joda.time.Duration;
-
-/**
- * Rule to detect the window/trigger settings.
- *
- */
-public class BeamAggregationRule extends RelOptRule {
-  public static final BeamAggregationRule INSTANCE =
-      new BeamAggregationRule(Aggregate.class, Project.class, RelFactories.LOGICAL_BUILDER);
-
-  public BeamAggregationRule(
-      Class<? extends Aggregate> aggregateClass,
-      Class<? extends Project> projectClass,
-      RelBuilderFactory relBuilderFactory) {
-    super(
-        operand(aggregateClass,
-            operand(projectClass, any())),
-        relBuilderFactory, null);
-  }
-
-  public BeamAggregationRule(RelOptRuleOperand operand, String description) {
-    super(operand, description);
-  }
-
-  @Override
-  public void onMatch(RelOptRuleCall call) {
-    final Aggregate aggregate = call.rel(0);
-    final Project project = call.rel(1);
-    updateWindowTrigger(call, aggregate, project);
-  }
-
-  private void updateWindowTrigger(RelOptRuleCall call, Aggregate aggregate,
-      Project project) {
-    ImmutableBitSet groupByFields = aggregate.getGroupSet();
-    List<RexNode> projectMapping = project.getProjects();
-
-    WindowFn windowFn = new GlobalWindows();
-    Trigger triggerFn = Repeatedly.forever(AfterWatermark.pastEndOfWindow());
-    int windowFieldIdx = -1;
-    Duration allowedLatence = Duration.ZERO;
-
-    for (int groupField : groupByFields.asList()) {
-      RexNode projNode = projectMapping.get(groupField);
-      if (projNode instanceof RexCall) {
-        SqlOperator op = ((RexCall) projNode).op;
-        ImmutableList<RexNode> parameters = ((RexCall) projNode).operands;
-        String functionName = op.getName();
-        switch (functionName) {
-        case "TUMBLE":
-          windowFieldIdx = groupField;
-          windowFn = FixedWindows
-              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
-          if (parameters.size() == 3) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        case "HOP":
-          windowFieldIdx = groupField;
-          windowFn = SlidingWindows
-              .of(Duration.millis(getWindowParameterAsMillis(parameters.get(1))))
-              .every(Duration.millis(getWindowParameterAsMillis(parameters.get(2))));
-          if (parameters.size() == 4) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(3))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        case "SESSION":
-          windowFieldIdx = groupField;
-          windowFn = Sessions
-              .withGapDuration(Duration.millis(getWindowParameterAsMillis(parameters.get(1))));
-          if (parameters.size() == 3) {
-            GregorianCalendar delayTime = (GregorianCalendar) ((RexLiteral) parameters.get(2))
-                .getValue();
-            triggerFn = createTriggerWithDelay(delayTime);
-            allowedLatence = (Duration.millis(delayTime.getTimeInMillis()));
-          }
-          break;
-        default:
-          break;
-        }
-      }
-    }
-
-    BeamAggregationRel newAggregator = new BeamAggregationRel(aggregate.getCluster(),
-        aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(aggregate.getInput(),
-            aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        aggregate.indicator,
-        aggregate.getGroupSet(),
-        aggregate.getGroupSets(),
-        aggregate.getAggCallList(),
-        windowFn,
-        triggerFn,
-        windowFieldIdx,
-        allowedLatence);
-    call.transformTo(newAggregator);
-  }
-
-  private Trigger createTriggerWithDelay(GregorianCalendar delayTime) {
-    return Repeatedly.forever(AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime
-        .pastFirstElementInPane().plusDelayOf(Duration.millis(delayTime.getTimeInMillis()))));
-  }
-
-  private long getWindowParameterAsMillis(RexNode parameterNode) {
-    if (parameterNode instanceof RexLiteral) {
-      return RexLiteral.intValue(parameterNode);
-    } else {
-      throw new IllegalArgumentException(String.format("[%s] is not valid.", parameterNode));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
deleted file mode 100644
index 414b666..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
-  public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
-  private BeamFilterRule() {
-    super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Filter filter = (Filter) rel;
-    final RelNode input = filter.getInput();
-
-    return new BeamFilterRel(filter.getCluster(),
-        filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        filter.getCondition());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index 4cc4ef5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamIOSinkRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
-  public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
-  private BeamIOSinkRule() {
-    super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSinkRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableModify tableModify = (TableModify) rel;
-    final RelNode input = tableModify.getInput();
-
-    final RelOptCluster cluster = tableModify.getCluster();
-    final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
-    final RelOptTable relOptTable = tableModify.getTable();
-    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-    final RelNode convertedInput = convert(input,
-        input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
-    final TableModify.Operation operation = tableModify.getOperation();
-    final List<String> updateColumnList = tableModify.getUpdateColumnList();
-    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
-    final boolean flattened = tableModify.isFlattened();
-
-    final Table table = tableModify.getTable().unwrap(Table.class);
-
-    switch (table.getJdbcTableType()) {
-    case TABLE:
-    case STREAM:
-      if (operation != TableModify.Operation.INSERT) {
-        throw new UnsupportedOperationException(
-            String.format("Streams doesn't support %s modify operation", operation));
-      }
-      return new BeamIOSinkRel(cluster, traitSet,
-          relOptTable, catalogReader, convertedInput, operation, updateColumnList,
-          sourceExpressionList, flattened);
-    default:
-      throw new IllegalArgumentException(
-          String.format("Unsupported table type: %s", table.getJdbcTableType()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 85a69ff..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamIOSourceRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
-  public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
-  private BeamIOSourceRule() {
-    super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSourceRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableScan scan = (TableScan) rel;
-
-    return new BeamIOSourceRel(scan.getCluster(),
-        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
deleted file mode 100644
index 70716c5..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamIntersectRule.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamIntersectRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Intersect;
-import org.apache.calcite.rel.logical.LogicalIntersect;
-
-/**
- * {@code ConverterRule} to replace {@code Intersect} with {@code BeamIntersectRel}.
- */
-public class BeamIntersectRule extends ConverterRule {
-  public static final BeamIntersectRule INSTANCE = new BeamIntersectRule();
-  private BeamIntersectRule() {
-    super(LogicalIntersect.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamIntersectRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Intersect intersect = (Intersect) rel;
-    final List<RelNode> inputs = intersect.getInputs();
-    return new BeamIntersectRel(
-        intersect.getCluster(),
-        intersect.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(inputs, BeamLogicalConvention.INSTANCE),
-        intersect.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
deleted file mode 100644
index 78253fe..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamJoinRule.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamJoinRel;
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.logical.LogicalJoin;
-
-/**
- * {@code ConverterRule} to replace {@code Join} with {@code BeamJoinRel}.
- */
-public class BeamJoinRule extends ConverterRule {
-  public static final BeamJoinRule INSTANCE = new BeamJoinRule();
-  private BeamJoinRule() {
-    super(LogicalJoin.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamJoinRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Join join = (Join) rel;
-    return new BeamJoinRel(
-        join.getCluster(),
-        join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(join.getLeft(),
-            join.getLeft().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        convert(join.getRight(),
-            join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        join.getCondition(),
-        join.getVariablesSet(),
-        join.getJoinType()
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
deleted file mode 100644
index ca93c71..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamMinusRule.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import java.util.List;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamMinusRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Minus;
-import org.apache.calcite.rel.logical.LogicalMinus;
-
-/**
- * {@code ConverterRule} to replace {@code Minus} with {@code BeamMinusRel}.
- */
-public class BeamMinusRule extends ConverterRule {
-  public static final BeamMinusRule INSTANCE = new BeamMinusRule();
-  private BeamMinusRule() {
-    super(LogicalMinus.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamMinusRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Minus minus = (Minus) rel;
-    final List<RelNode> inputs = minus.getInputs();
-    return new BeamMinusRel(
-        minus.getCluster(),
-        minus.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(inputs, BeamLogicalConvention.INSTANCE),
-        minus.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
deleted file mode 100644
index 6dc3b57..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamProjectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
-public class BeamProjectRule extends ConverterRule {
-  public static final BeamProjectRule INSTANCE = new BeamProjectRule();
-
-  private BeamProjectRule() {
-    super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Project project = (Project) rel;
-    final RelNode input = project.getInput();
-
-    return new BeamProjectRel(project.getCluster(),
-        project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        project.getProjects(), project.getRowType());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
deleted file mode 100644
index d802e9d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamSortRule.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-
-import org.apache.beam.dsls.sql.rel.BeamSortRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Sort;
-import org.apache.calcite.rel.logical.LogicalSort;
-
-/**
- * {@code ConverterRule} to replace {@code Sort} with {@code BeamSortRel}.
- */
-public class BeamSortRule extends ConverterRule {
-  public static final BeamSortRule INSTANCE = new BeamSortRule();
-  private BeamSortRule() {
-    super(LogicalSort.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamSortRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Sort sort = (Sort) rel;
-    final RelNode input = sort.getInput();
-    return new BeamSortRel(
-        sort.getCluster(),
-        sort.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        sort.getCollation(),
-        sort.offset,
-        sort.fetch
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
deleted file mode 100644
index b8430b9..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamUnionRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamUnionRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Union;
-import org.apache.calcite.rel.logical.LogicalUnion;
-
-/**
- * A {@code ConverterRule} to replace {@link org.apache.calcite.rel.core.Union} with
- * {@link BeamUnionRule}.
- */
-public class BeamUnionRule extends ConverterRule {
-  public static final BeamUnionRule INSTANCE = new BeamUnionRule();
-  private BeamUnionRule() {
-    super(LogicalUnion.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamUnionRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Union union = (Union) rel;
-
-    return new BeamUnionRel(
-        union.getCluster(),
-        union.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convertList(union.getInputs(), BeamLogicalConvention.INSTANCE),
-        union.all
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
deleted file mode 100644
index 4ea9e60..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/BeamValuesRule.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.rule;
-
-import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.apache.beam.dsls.sql.rel.BeamValuesRel;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Values;
-import org.apache.calcite.rel.logical.LogicalValues;
-
-/**
- * {@code ConverterRule} to replace {@code Values} with {@code BeamValuesRel}.
- */
-public class BeamValuesRule extends ConverterRule {
-  public static final BeamValuesRule INSTANCE = new BeamValuesRule();
-  private BeamValuesRule() {
-    super(LogicalValues.class, Convention.NONE,
-        BeamLogicalConvention.INSTANCE, "BeamValuesRule");
-  }
-
-  @Override public RelNode convert(RelNode rel) {
-    Values values = (Values) rel;
-    return new BeamValuesRel(
-        values.getCluster(),
-        values.getRowType(),
-        values.getTuples(),
-        values.getTraitSet().replace(BeamLogicalConvention.INSTANCE)
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
deleted file mode 100644
index 5d32647..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/rule/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.calcite.plan.RelOptRule} to generate
- * {@link org.apache.beam.dsls.sql.rel.BeamRelNode}.
- */
-package org.apache.beam.dsls.sql.rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
deleted file mode 100644
index dfa2785..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements BeamSqlTable, Serializable {
-  protected BeamSqlRowType beamSqlRowType;
-  public BaseBeamTable(BeamSqlRowType beamSqlRowType) {
-    this.beamSqlRowType = beamSqlRowType;
-  }
-
-  @Override public BeamSqlRowType getRowType() {
-    return beamSqlRowType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
deleted file mode 100644
index 502e8c1..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
-  BOUNDED, UNBOUNDED;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
deleted file mode 100644
index 5b63780..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as a virtual table,
- * then a downstream query can query directly.
- */
-public class BeamPCollectionTable extends BaseBeamTable {
-  private BeamIOType ioType;
-  private transient PCollection<BeamSqlRow> upstream;
-
-  protected BeamPCollectionTable(BeamSqlRowType beamSqlRowType) {
-    super(beamSqlRowType);
-  }
-
-  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream,
-      BeamSqlRowType beamSqlRowType){
-    this(beamSqlRowType);
-    ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
-        ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
-    this.upstream = upstream;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return ioType;
-  }
-
-  @Override
-  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
-    return upstream;
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
-    throw new IllegalArgumentException("cannot use [BeamPCollectionTable] as target");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
deleted file mode 100644
index d789446..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.Instant;
-
-/**
- * Represent a generic ROW record in Beam SQL.
- *
- */
-public class BeamSqlRow implements Serializable {
-  private static final Map<Integer, Class> SQL_TYPE_TO_JAVA_CLASS = new HashMap<>();
-  static {
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TINYINT, Byte.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.SMALLINT, Short.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.INTEGER, Integer.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BIGINT, Long.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.FLOAT, Float.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DOUBLE, Double.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DECIMAL, BigDecimal.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.BOOLEAN, Boolean.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.CHAR, String.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.VARCHAR, String.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIME, GregorianCalendar.class);
-
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.DATE, Date.class);
-    SQL_TYPE_TO_JAVA_CLASS.put(Types.TIMESTAMP, Date.class);
-  }
-
-  private List<Integer> nullFields = new ArrayList<>();
-  private List<Object> dataValues;
-  private BeamSqlRowType dataType;
-
-  private Instant windowStart = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
-  private Instant windowEnd = new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
-  public BeamSqlRow(BeamSqlRowType dataType) {
-    this.dataType = dataType;
-    this.dataValues = new ArrayList<>();
-    for (int idx = 0; idx < dataType.size(); ++idx) {
-      dataValues.add(null);
-      nullFields.add(idx);
-    }
-  }
-
-  public BeamSqlRow(BeamSqlRowType dataType, List<Object> dataValues) {
-    this(dataType);
-    for (int idx = 0; idx < dataValues.size(); ++idx) {
-      addField(idx, dataValues.get(idx));
-    }
-  }
-
-  public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow window){
-    windowStart = upstreamRecord.windowStart;
-    windowEnd = upstreamRecord.windowEnd;
-
-    if (window instanceof IntervalWindow) {
-      IntervalWindow iWindow = (IntervalWindow) window;
-      windowStart = iWindow.start();
-      windowEnd = iWindow.end();
-    }
-  }
-
-  public void addField(String fieldName, Object fieldValue) {
-    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
-  }
-
-  public void addField(int index, Object fieldValue) {
-    if (fieldValue == null) {
-      return;
-    } else {
-      if (nullFields.contains(index)) {
-        nullFields.remove(nullFields.indexOf(index));
-      }
-    }
-
-    validateValueType(index, fieldValue);
-    dataValues.set(index, fieldValue);
-  }
-
-  private void validateValueType(int index, Object fieldValue) {
-    SqlTypeName fieldType = CalciteUtils.getFieldType(dataType, index);
-    Class javaClazz = SQL_TYPE_TO_JAVA_CLASS.get(CalciteUtils.toJavaType(fieldType));
-    if (javaClazz == null) {
-      throw new UnsupportedOperationException("Data type: " + fieldType + " not supported yet!");
-    }
-
-    if (!fieldValue.getClass().equals(javaClazz)) {
-      throw new IllegalArgumentException(
-          String.format("[%s](%s) doesn't match type [%s]",
-              fieldValue, fieldValue.getClass(), fieldType)
-      );
-    }
-  }
-
-  public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
-  }
-
-  public byte getByte(String fieldName) {
-    return (Byte) getFieldValue(fieldName);
-  }
-
-  public short getShort(String fieldName) {
-    return (Short) getFieldValue(fieldName);
-  }
-
-  public int getInteger(String fieldName) {
-    return (Integer) getFieldValue(fieldName);
-  }
-
-  public float getFloat(String fieldName) {
-    return (Float) getFieldValue(fieldName);
-  }
-
-  public double getDouble(String fieldName) {
-    return (Double) getFieldValue(fieldName);
-  }
-
-  public long getLong(String fieldName) {
-    return (Long) getFieldValue(fieldName);
-  }
-
-  public String getString(String fieldName) {
-    return (String) getFieldValue(fieldName);
-  }
-
-  public Date getDate(String fieldName) {
-    return (Date) getFieldValue(fieldName);
-  }
-
-  public GregorianCalendar getGregorianCalendar(String fieldName) {
-    return (GregorianCalendar) getFieldValue(fieldName);
-  }
-
-  public BigDecimal getBigDecimal(String fieldName) {
-    return (BigDecimal) getFieldValue(fieldName);
-  }
-
-  public boolean getBoolean(String fieldName) {
-    return (boolean) getFieldValue(fieldName);
-  }
-
-  public Object getFieldValue(int fieldIdx) {
-    if (nullFields.contains(fieldIdx)) {
-      return null;
-    }
-
-    return dataValues.get(fieldIdx);
-  }
-
-  public byte getByte(int idx) {
-    return (Byte) getFieldValue(idx);
-  }
-
-  public short getShort(int idx) {
-    return (Short) getFieldValue(idx);
-  }
-
-  public int getInteger(int idx) {
-    return (Integer) getFieldValue(idx);
-  }
-
-  public float getFloat(int idx) {
-    return (Float) getFieldValue(idx);
-  }
-
-  public double getDouble(int idx) {
-    return (Double) getFieldValue(idx);
-  }
-
-  public long getLong(int idx) {
-    return (Long) getFieldValue(idx);
-  }
-
-  public String getString(int idx) {
-    return (String) getFieldValue(idx);
-  }
-
-  public Date getDate(int idx) {
-    return (Date) getFieldValue(idx);
-  }
-
-  public GregorianCalendar getGregorianCalendar(int idx) {
-    return (GregorianCalendar) getFieldValue(idx);
-  }
-
-  public BigDecimal getBigDecimal(int idx) {
-    return (BigDecimal) getFieldValue(idx);
-  }
-
-  public boolean getBoolean(int idx) {
-    return (boolean) getFieldValue(idx);
-  }
-
-  public int size() {
-    return dataValues.size();
-  }
-
-  public List<Object> getDataValues() {
-    return dataValues;
-  }
-
-  public void setDataValues(List<Object> dataValues) {
-    this.dataValues = dataValues;
-  }
-
-  public BeamSqlRowType getDataType() {
-    return dataType;
-  }
-
-  public void setDataType(BeamSqlRowType dataType) {
-    this.dataType = dataType;
-  }
-
-  public void setNullFields(List<Integer> nullFields) {
-    this.nullFields = nullFields;
-  }
-
-  public List<Integer> getNullFields() {
-    return nullFields;
-  }
-
-  /**
-   * is the specified field NULL?
-   */
-  public boolean isNull(int idx) {
-    return nullFields.contains(idx);
-  }
-
-  public Instant getWindowStart() {
-    return windowStart;
-  }
-
-  public Instant getWindowEnd() {
-    return windowEnd;
-  }
-
-  public void setWindowStart(Instant windowStart) {
-    this.windowStart = windowStart;
-  }
-
-  public void setWindowEnd(Instant windowEnd) {
-    this.windowEnd = windowEnd;
-  }
-
-  @Override
-  public String toString() {
-    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + dataValues + ", dataType="
-        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + windowEnd + "]";
-  }
-
-  /**
-   * Return data fields as key=value.
-   */
-  public String valueInString() {
-    StringBuilder sb = new StringBuilder();
-    for (int idx = 0; idx < size(); ++idx) {
-      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
-    }
-    return sb.substring(1);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    BeamSqlRow other = (BeamSqlRow) obj;
-    return toString().equals(other.toString());
-  }
-
-  @Override public int hashCode() {
-    return 31 * (31 * dataType.hashCode() + dataValues.hashCode()) + nullFields.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index f14864a..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.BigDecimalCoder;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CustomCoder;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- *  A {@link Coder} encodes {@link BeamSqlRow}.
- */
-public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
-  private BeamSqlRowType tableSchema;
-
-  private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
-
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
-  private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-  private static final DoubleCoder doubleCoder = DoubleCoder.of();
-  private static final InstantCoder instantCoder = InstantCoder.of();
-  private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
-  private static final ByteCoder byteCoder = ByteCoder.of();
-
-  public BeamSqlRowCoder(BeamSqlRowType tableSchema) {
-    this.tableSchema = tableSchema;
-  }
-
-  @Override
-  public void encode(BeamSqlRow value, OutputStream outStream) throws CoderException, IOException {
-    listCoder.encode(value.getNullFields(), outStream);
-    for (int idx = 0; idx < value.size(); ++idx) {
-      if (value.getNullFields().contains(idx)) {
-        continue;
-      }
-
-      switch (CalciteUtils.getFieldType(value.getDataType(), idx)) {
-        case INTEGER:
-          intCoder.encode(value.getInteger(idx), outStream);
-          break;
-        case SMALLINT:
-          intCoder.encode((int) value.getShort(idx), outStream);
-          break;
-        case TINYINT:
-          byteCoder.encode(value.getByte(idx), outStream);
-          break;
-        case DOUBLE:
-          doubleCoder.encode(value.getDouble(idx), outStream);
-          break;
-        case FLOAT:
-          doubleCoder.encode((double) value.getFloat(idx), outStream);
-          break;
-        case DECIMAL:
-          bigDecimalCoder.encode(value.getBigDecimal(idx), outStream);
-          break;
-        case BIGINT:
-          longCoder.encode(value.getLong(idx), outStream);
-          break;
-        case VARCHAR:
-        case CHAR:
-          stringCoder.encode(value.getString(idx), outStream);
-          break;
-        case TIME:
-          longCoder.encode(value.getGregorianCalendar(idx).getTime().getTime(), outStream);
-          break;
-        case DATE:
-        case TIMESTAMP:
-          longCoder.encode(value.getDate(idx).getTime(), outStream);
-          break;
-        case BOOLEAN:
-          byteCoder.encode((byte) (value.getBoolean(idx) ? 1 : 0), outStream);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Data type: " + value.getDataType().getFieldsType().get(idx) + " not supported yet!");
-      }
-    }
-
-    instantCoder.encode(value.getWindowStart(), outStream);
-    instantCoder.encode(value.getWindowEnd(), outStream);
-  }
-
-  @Override
-  public BeamSqlRow decode(InputStream inStream) throws CoderException, IOException {
-    List<Integer> nullFields = listCoder.decode(inStream);
-
-    BeamSqlRow record = new BeamSqlRow(tableSchema);
-    record.setNullFields(nullFields);
-    for (int idx = 0; idx < tableSchema.size(); ++idx) {
-      if (nullFields.contains(idx)) {
-        continue;
-      }
-
-      switch (CalciteUtils.getFieldType(tableSchema, idx)) {
-        case INTEGER:
-          record.addField(idx, intCoder.decode(inStream));
-          break;
-        case SMALLINT:
-          record.addField(idx, intCoder.decode(inStream).shortValue());
-          break;
-        case TINYINT:
-          record.addField(idx, byteCoder.decode(inStream));
-          break;
-        case DOUBLE:
-          record.addField(idx, doubleCoder.decode(inStream));
-          break;
-        case FLOAT:
-          record.addField(idx, doubleCoder.decode(inStream).floatValue());
-          break;
-        case BIGINT:
-          record.addField(idx, longCoder.decode(inStream));
-          break;
-        case DECIMAL:
-          record.addField(idx, bigDecimalCoder.decode(inStream));
-          break;
-        case VARCHAR:
-        case CHAR:
-          record.addField(idx, stringCoder.decode(inStream));
-          break;
-        case TIME:
-          GregorianCalendar calendar = new GregorianCalendar();
-          calendar.setTime(new Date(longCoder.decode(inStream)));
-          record.addField(idx, calendar);
-          break;
-        case DATE:
-        case TIMESTAMP:
-          record.addField(idx, new Date(longCoder.decode(inStream)));
-          break;
-        case BOOLEAN:
-          record.addField(idx, byteCoder.decode(inStream) == 1);
-          break;
-
-        default:
-          throw new UnsupportedOperationException("Data type: "
-              + CalciteUtils.toCalciteType(tableSchema.getFieldsType().get(idx))
-              + " not supported yet!");
-      }
-    }
-
-    record.setWindowStart(instantCoder.decode(inStream));
-    record.setWindowEnd(instantCoder.decode(inStream));
-
-    return record;
-  }
-
-  public BeamSqlRowType getTableSchema() {
-    return tableSchema;
-  }
-
-  @Override
-  public void verifyDeterministic()
-      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
deleted file mode 100644
index 1129bdd..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Field type information in {@link BeamSqlRow}.
- *
- */
-@AutoValue
-public abstract class BeamSqlRowType implements Serializable {
-  public abstract List<String> getFieldsName();
-  public abstract List<Integer> getFieldsType();
-
-  public static BeamSqlRowType create(List<String> fieldNames, List<Integer> fieldTypes) {
-    return new AutoValue_BeamSqlRowType(fieldNames, fieldTypes);
-  }
-
-  public int size() {
-    return getFieldsName().size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
deleted file mode 100644
index d419473..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlTable.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.dsls.sql.schema;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-/**
- * This interface defines a Beam Sql Table.
- */
-public interface BeamSqlTable {
-  /**
-   * In Beam SQL, there's no difference between a batch query and a streaming
-   * query. {@link BeamIOType} is used to validate the sources.
-   */
-  BeamIOType getSourceType();
-
-  /**
-   * create a {@code PCollection<BeamSqlRow>} from source.
-   *
-   */
-  PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
-
-  /**
-   * create a {@code IO.write()} instance to write to target.
-   *
-   */
-   PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter();
-
-  /**
-   * Get the schema info of the table.
-   */
-   BeamSqlRowType getRowType();
-}