You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/04/18 21:41:00 UTC

[2/5] beam git commit: checkstyle and rename package

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
deleted file mode 100644
index 702381d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/UnsupportedOperatorsVisitor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.planner;
-
-import org.apache.calcite.sql.util.SqlShuttle;
-
-/**
- * Unsupported operation to visit a RelNode.
- *
- */
-public class UnsupportedOperatorsVisitor extends SqlShuttle {
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
deleted file mode 100644
index d98c584..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/planner/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.beam.dsls.sql.planner.BeamQueryPlanner} is the main interface.
- * It defines data sources, validate a SQL statement, and convert it as a Beam
- * pipeline.
- */
-package org.beam.dsls.sql.planner;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
deleted file mode 100644
index 64f2d1f..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamFilterRel.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.beam.dsls.sql.transform.BeamSQLFilterFn;
-
-/**
- * BeamRelNode to replace a {@code Filter} node.
- *
- */
-public class BeamFilterRel extends Filter implements BeamRelNode {
-
-  public BeamFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child,
-      RexNode condition) {
-    super(cluster, traits, child, condition);
-  }
-
-  @Override
-  public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
-    return new BeamFilterRel(getCluster(), traitSet, input, condition);
-  }
-
-  @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
-    RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
-    String stageName = BeamSQLRelUtils.getStageName(this);
-
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
-    BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
-
-    PCollection<BeamSQLRow> projectStream = upstream.apply(stageName,
-        ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
-
-    planCreator.setLatestStream(projectStream);
-
-    return planCreator.getPipeline();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
deleted file mode 100644
index 46654e5..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * BeamRelNode to replace a {@code TableModify} node.
- *
- */
-public class BeamIOSinkRel extends TableModify implements BeamRelNode {
-  public BeamIOSinkRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table,
-      Prepare.CatalogReader catalogReader, RelNode child, Operation operation,
-      List<String> updateColumnList, List<RexNode> sourceExpressionList, boolean flattened) {
-    super(cluster, traits, table, catalogReader, child, operation, updateColumnList,
-        sourceExpressionList, flattened);
-  }
-
-  @Override
-  public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-    return new BeamIOSinkRel(getCluster(), traitSet, getTable(), getCatalogReader(), sole(inputs),
-        getOperation(), getUpdateColumnList(), getSourceExpressionList(), isFlattened());
-  }
-
-  @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
-    RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
-    String stageName = BeamSQLRelUtils.getStageName(this);
-
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
-
-    BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName);
-
-    upstream.apply(stageName, targetTable.buildIOWriter());
-
-    planCreator.setHasPersistent(true);
-
-    return planCreator.getPipeline();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
deleted file mode 100644
index f14db92..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import com.google.common.base.Joiner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.core.TableScan;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * BeamRelNode to replace a {@code TableScan} node.
- *
- */
-public class BeamIOSourceRel extends TableScan implements BeamRelNode {
-
-  public BeamIOSourceRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
-    super(cluster, traitSet, table);
-  }
-
-  @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", "");
-
-    BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName);
-
-    String stageName = BeamSQLRelUtils.getStageName(this);
-
-    PCollection<BeamSQLRow> sourceStream = planCreator.getPipeline().apply(stageName,
-        sourceTable.buildIOReader());
-
-    planCreator.setLatestStream(sourceStream);
-
-    return planCreator.getPipeline();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
deleted file mode 100644
index 50fe8e0..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamLogicalConvention.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelTrait;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
-
-/**
- * Convertion for Beam SQL.
- *
- */
-public enum BeamLogicalConvention implements Convention {
-  INSTANCE;
-
-  @Override
-  public Class getInterface() {
-    return BeamRelNode.class;
-  }
-
-  @Override
-  public String getName() {
-    return "BEAM_LOGICAL";
-  }
-
-  @Override
-  public RelTraitDef getTraitDef() {
-    return ConventionTraitDef.INSTANCE;
-  }
-
-  @Override
-  public boolean satisfies(RelTrait trait) {
-    return this == trait;
-  }
-
-  @Override
-  public void register(RelOptPlanner planner) {
-  }
-
-  @Override
-  public String toString() {
-    return getName();
-  }
-
-  @Override
-  public boolean canConvertConvention(Convention toConvention) {
-    return false;
-  }
-
-  @Override
-  public boolean useAbstractConvertersForConversion(RelTraitSet fromTraits, RelTraitSet toTraits) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
deleted file mode 100644
index e41d74e..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamProjectRel.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.interpreter.BeamSQLSpELExecutor;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.beam.dsls.sql.transform.BeamSQLProjectFn;
-
-/**
- * BeamRelNode to replace a {@code Project} node.
- *
- */
-public class BeamProjectRel extends Project implements BeamRelNode {
-
-  /**
-   * projects: {@link RexLiteral}, {@link RexInputRef}, {@link RexCall}.
-   *
-   */
-  public BeamProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input,
-      List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traits, input, projects, rowType);
-  }
-
-  @Override
-  public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects,
-      RelDataType rowType) {
-    return new BeamProjectRel(getCluster(), traitSet, input, projects, rowType);
-  }
-
-  @Override
-  public Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception {
-    RelNode input = getInput();
-    BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
-
-    String stageName = BeamSQLRelUtils.getStageName(this);
-
-    PCollection<BeamSQLRow> upstream = planCreator.getLatestStream();
-
-    BeamSQLExpressionExecutor executor = new BeamSQLSpELExecutor(this);
-
-    PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
-        .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
-
-    planCreator.setLatestStream(projectStream);
-
-    return planCreator.getPipeline();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
deleted file mode 100644
index 07ffee5..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/BeamRelNode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rel;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.calcite.rel.RelNode;
-import org.beam.dsls.sql.planner.BeamPipelineCreator;
-
-/**
- * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
- * called by {@link BeamPipelineCreator}.
- *
- */
-public interface BeamRelNode extends RelNode {
-
-  /**
-   * A {@link BeamRelNode} is a recursive structure, the
-   * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
-   * algorithm.
-   *
-   */
-  Pipeline buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
deleted file mode 100644
index 13dc962..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rel/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * BeamSQL specified nodes, to replace {@link org.apache.calcite.rel.RelNode}.
- *
- */
-package org.beam.dsls.sql.rel;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
deleted file mode 100644
index 2ad7c07..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamFilterRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Filter;
-import org.apache.calcite.rel.logical.LogicalFilter;
-import org.beam.dsls.sql.rel.BeamFilterRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link Filter} with {@link BeamFilterRel}.
- *
- */
-public class BeamFilterRule extends ConverterRule {
-  public static final BeamFilterRule INSTANCE = new BeamFilterRule();
-
-  private BeamFilterRule() {
-    super(LogicalFilter.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamFilterRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Filter filter = (Filter) rel;
-    final RelNode input = filter.getInput();
-
-    return new BeamFilterRel(filter.getCluster(),
-        filter.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        filter.getCondition());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
deleted file mode 100644
index a44c002..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSinkRule.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import java.util.List;
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.calcite.rel.logical.LogicalTableModify;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.Table;
-import org.beam.dsls.sql.rel.BeamIOSinkRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link TableModify} with
- * {@link BeamIOSinkRel}.
- *
- */
-public class BeamIOSinkRule extends ConverterRule {
-  public static final BeamIOSinkRule INSTANCE = new BeamIOSinkRule();
-
-  private BeamIOSinkRule() {
-    super(LogicalTableModify.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSinkRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableModify tableModify = (TableModify) rel;
-    final RelNode input = tableModify.getInput();
-
-    final RelOptCluster cluster = tableModify.getCluster();
-    final RelTraitSet traitSet = tableModify.getTraitSet().replace(BeamLogicalConvention.INSTANCE);
-    final RelOptTable relOptTable = tableModify.getTable();
-    final Prepare.CatalogReader catalogReader = tableModify.getCatalogReader();
-    final RelNode convertedInput = convert(input,
-        input.getTraitSet().replace(BeamLogicalConvention.INSTANCE));
-    final TableModify.Operation operation = tableModify.getOperation();
-    final List<String> updateColumnList = tableModify.getUpdateColumnList();
-    final List<RexNode> sourceExpressionList = tableModify.getSourceExpressionList();
-    final boolean flattened = tableModify.isFlattened();
-
-    final Table table = tableModify.getTable().unwrap(Table.class);
-
-    switch (table.getJdbcTableType()) {
-    case TABLE:
-    case STREAM:
-      if (operation != TableModify.Operation.INSERT) {
-        throw new UnsupportedOperationException(
-            String.format("Streams doesn't support %s modify operation", operation));
-      }
-      return new BeamIOSinkRel(cluster, traitSet,
-          relOptTable, catalogReader, convertedInput, operation, updateColumnList,
-          sourceExpressionList, flattened);
-    default:
-      throw new IllegalArgumentException(
-          String.format("Unsupported table type: %s", table.getJdbcTableType()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
deleted file mode 100644
index 9e4778b..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamIOSourceRule.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.logical.LogicalTableScan;
-import org.beam.dsls.sql.rel.BeamIOSourceRel;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-
-/**
- * A {@code ConverterRule} to replace {@link TableScan} with
- * {@link BeamIOSourceRel}.
- *
- */
-public class BeamIOSourceRule extends ConverterRule {
-  public static final BeamIOSourceRule INSTANCE = new BeamIOSourceRule();
-
-  private BeamIOSourceRule() {
-    super(LogicalTableScan.class, Convention.NONE, BeamLogicalConvention.INSTANCE,
-        "BeamIOSourceRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final TableScan scan = (TableScan) rel;
-
-    return new BeamIOSourceRel(scan.getCluster(),
-        scan.getTraitSet().replace(BeamLogicalConvention.INSTANCE), scan.getTable());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
deleted file mode 100644
index 117a056..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/BeamProjectRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.rule;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.convert.ConverterRule;
-import org.apache.calcite.rel.core.Project;
-import org.apache.calcite.rel.logical.LogicalProject;
-import org.beam.dsls.sql.rel.BeamLogicalConvention;
-import org.beam.dsls.sql.rel.BeamProjectRel;
-
-/**
- * A {@code ConverterRule} to replace {@link Project} with
- * {@link BeamProjectRel}.
- *
- */
-public class BeamProjectRule extends ConverterRule {
-  public static final BeamProjectRule INSTANCE = new BeamProjectRule();
-
-  private BeamProjectRule() {
-    super(LogicalProject.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamProjectRule");
-  }
-
-  @Override
-  public RelNode convert(RelNode rel) {
-    final Project project = (Project) rel;
-    final RelNode input = project.getInput();
-
-    return new BeamProjectRel(project.getCluster(),
-        project.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
-        project.getProjects(), project.getRowType());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
deleted file mode 100644
index 56ddcf3..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/rule/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.calcite.plan.RelOptRule} to generate {@link org.beam.dsls.sql.rel.BeamRelNode}.
- */
-package org.beam.dsls.sql.rule;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
deleted file mode 100644
index 3816063..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BaseBeamTable.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.ScannableTable;
-import org.apache.calcite.schema.Schema.TableType;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
-import org.beam.dsls.sql.planner.BeamQueryPlanner;
-
-/**
- * Each IO in Beam has one table schema, by extending {@link BaseBeamTable}.
- */
-public abstract class BaseBeamTable implements ScannableTable, Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1262988061830914193L;
-  private RelDataType relDataType;
-
-  protected BeamSQLRecordType beamSqlRecordType;
-
-  public BaseBeamTable(RelProtoDataType protoRowType) {
-    this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
-    this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
-  }
-
-  /**
-   * In Beam SQL, there's no difference between a batch query and a streaming
-   * query. {@link BeamIOType} is used to validate the sources.
-   */
-  public abstract BeamIOType getSourceType();
-
-  /**
-   * create a {@code IO.read()} instance to read from source.
-   *
-   */
-  public abstract PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader();
-
-  /**
-   * create a {@code IO.write()} instance to write to target.
-   *
-   */
-  public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter();
-
-  @Override
-  public Enumerable<Object[]> scan(DataContext root) {
-    // not used as Beam SQL uses its own execution engine
-    return null;
-  }
-
-  @Override
-  public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-    return relDataType;
-  }
-
-  /**
-   * Not used {@link Statistic} to optimize the plan.
-   */
-  @Override
-  public Statistic getStatistic() {
-    return Statistics.UNKNOWN;
-  }
-
-  /**
-   * all sources are treated as TABLE in Beam SQL.
-   */
-  @Override
-  public TableType getJdbcTableType() {
-    return TableType.TABLE;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
deleted file mode 100644
index 5e55b0f..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamIOType.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-
-/**
- * Type as a source IO, determined whether it's a STREAMING process, or batch
- * process.
- */
-public enum BeamIOType implements Serializable {
-  BOUNDED, UNBOUNDED;
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
deleted file mode 100644
index dc8e381..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Field type information in {@link BeamSQLRow}.
- *
- */
-//@DefaultCoder(BeamSQLRecordTypeCoder.class)
-public class BeamSQLRecordType implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -5318734648766104712L;
-  private List<String> fieldsName = new ArrayList<>();
-  private List<SqlTypeName> fieldsType = new ArrayList<>();
-
-  public static BeamSQLRecordType from(RelDataType tableInfo) {
-    BeamSQLRecordType record = new BeamSQLRecordType();
-    for (RelDataTypeField f : tableInfo.getFieldList()) {
-      record.fieldsName.add(f.getName());
-      record.fieldsType.add(f.getType().getSqlTypeName());
-    }
-    return record;
-  }
-
-  public int size() {
-    return fieldsName.size();
-  }
-
-  public List<String> getFieldsName() {
-    return fieldsName;
-  }
-
-  public void setFieldsName(List<String> fieldsName) {
-    this.fieldsName = fieldsName;
-  }
-
-  public List<SqlTypeName> getFieldsType() {
-    return fieldsType;
-  }
-
-  public void setFieldsType(List<SqlTypeName> fieldsType) {
-    this.fieldsType = fieldsType;
-  }
-
-  @Override
-  public String toString() {
-    return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + fieldsType + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
deleted file mode 100644
index 2989cb9..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A {@link Coder} for {@link BeamSQLRecordType}.
- *
- */
-public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final VarIntCoder intCoder = VarIntCoder.of();
-
-  private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
-  private BeamSQLRecordTypeCoder(){}
-
-  public static BeamSQLRecordTypeCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(BeamSQLRecordType value, OutputStream outStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    Context nested = context.nested();
-    intCoder.encode(value.size(), outStream, nested);
-    for(String fieldName : value.getFieldsName()){
-      stringCoder.encode(fieldName, outStream, nested);
-    }
-    for(SqlTypeName fieldType : value.getFieldsType()){
-      stringCoder.encode(fieldType.name(), outStream, nested);
-    }
-    outStream.flush();
-  }
-
-  @Override
-  public BeamSQLRecordType decode(InputStream inStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    BeamSQLRecordType typeRecord = new BeamSQLRecordType();
-    Context nested = context.nested();
-    int size = intCoder.decode(inStream, nested);
-    for(int idx=0; idx<size; ++idx){
-      typeRecord.getFieldsName().add(stringCoder.decode(inStream, nested));
-    }
-    for(int idx=0; idx<size; ++idx){
-      typeRecord.getFieldsType().add(SqlTypeName.valueOf(stringCoder.decode(inStream, nested)));
-    }
-    return typeRecord;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  @Override
-  public void verifyDeterministic()
-      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-    // TODO Auto-generated method stub
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
deleted file mode 100644
index db93168..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSQLRow.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Repersent a generic ROW record in Beam SQL.
- *
- */
-public class BeamSQLRow implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 4569220242480160895L;
-
-  private List<Integer> nullFields = new ArrayList<>();
-  private List<Object> dataValues;
-  private BeamSQLRecordType dataType;
-
-  public BeamSQLRow(BeamSQLRecordType dataType) {
-    this.dataType = dataType;
-    this.dataValues = new ArrayList<>();
-    for(int idx=0; idx<dataType.size(); ++idx){
-      dataValues.add(null);
-    }
-  }
-
-  public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
-    this.dataValues = dataValues;
-    this.dataType = dataType;
-  }
-
-  public void addField(String fieldName, Object fieldValue) {
-    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
-  }
-
-  public void addField(int index, Object fieldValue) {
-    if(fieldValue == null){
-      dataValues.set(index, fieldValue);
-      if(!nullFields.contains(index)){nullFields.add(index);}
-      return;
-    }
-
-    SqlTypeName fieldType = dataType.getFieldsType().get(index);
-    switch (fieldType) {
-    case INTEGER:
-    case SMALLINT:
-    case TINYINT:
-      if(!(fieldValue instanceof Integer)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case DOUBLE:
-      if(!(fieldValue instanceof Double)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case BIGINT:
-      if(!(fieldValue instanceof Long)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case FLOAT:
-      if(!(fieldValue instanceof Float)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case VARCHAR:
-      if(!(fieldValue instanceof String)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    case TIME:
-    case TIMESTAMP:
-      if(!(fieldValue instanceof Date)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }
-      break;
-    default:
-      throw new UnsupportedDataTypeException(fieldType);
-    }
-    dataValues.set(index, fieldValue);
-  }
-
-
-  public int getInteger(int idx) {
-    return (Integer) getFieldValue(idx);
-  }
-
-  public double getDouble(int idx) {
-    return (Double) getFieldValue(idx);
-  }
-
-  public long getLong(int idx) {
-    return (Long) getFieldValue(idx);
-  }
-
-  public String getString(int idx) {
-    return (String) getFieldValue(idx);
-  }
-
-  public Date getDate(int idx) {
-    return (Date) getFieldValue(idx);
-  }
-
-  public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
-  }
-
-  public Object getFieldValue(int fieldIdx) {
-    if(nullFields.contains(fieldIdx)){
-      return null;
-    }
-
-    Object fieldValue = dataValues.get(fieldIdx);
-    SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
-
-    switch (fieldType) {
-    case INTEGER:
-    case SMALLINT:
-    case TINYINT:
-      if(!(fieldValue instanceof Integer)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return Integer.valueOf(fieldValue.toString());
-      }
-    case DOUBLE:
-      if(!(fieldValue instanceof Double)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return Double.valueOf(fieldValue.toString());
-      }
-    case BIGINT:
-      if(!(fieldValue instanceof Long)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return Long.valueOf(fieldValue.toString());
-      }
-    case FLOAT:
-      if(!(fieldValue instanceof Float)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return Float.valueOf(fieldValue.toString());
-      }
-    case VARCHAR:
-      if(!(fieldValue instanceof String)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return fieldValue.toString();
-      }
-    case TIME:
-    case TIMESTAMP:
-      if(!(fieldValue instanceof Date)){
-        throw new InvalidFieldException(String.format("[%s] doesn't match type [%s]", fieldValue, fieldType));
-      }else{
-        return fieldValue;
-      }
-    default:
-      throw new UnsupportedDataTypeException(fieldType);
-    }
-  }
-
-  public int size() {
-    return dataValues.size();
-  }
-
-  public List<Object> getDataValues() {
-    return dataValues;
-  }
-
-  public void setDataValues(List<Object> dataValues) {
-    this.dataValues = dataValues;
-  }
-
-  public BeamSQLRecordType getDataType() {
-    return dataType;
-  }
-
-  public void setDataType(BeamSQLRecordType dataType) {
-    this.dataType = dataType;
-  }
-
-  public void setNullFields(List<Integer> nullFields) {
-    this.nullFields = nullFields;
-  }
-
-  public List<Integer> getNullFields() {
-    return nullFields;
-  }
-
-  @Override
-  public String toString() {
-    return "BeamSQLRow [dataValues=" + dataValues + ", dataType=" + dataType + "]";
-  }
-
-  /**
-   * Return data fields as key=value.
-   */
-  public String valueInString() {
-    StringBuffer sb = new StringBuffer();
-    for (int idx = 0; idx < size(); ++idx) {
-      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), getFieldValue(idx)));
-    }
-    return sb.substring(1);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    BeamSQLRow other = (BeamSQLRow) obj;
-    return toString().equals(other.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
deleted file mode 100644
index 00af18d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Date;
-import java.util.List;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.DoubleCoder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-
-/**
- * A {@link Coder} encodes {@link BeamSQLRow}.
- *
- */
-public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
-  private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
-
-  private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
-
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final BigEndianIntegerCoder intCoder = BigEndianIntegerCoder.of();
-  private static final BigEndianLongCoder longCoder = BigEndianLongCoder.of();
-  private static final DoubleCoder doubleCoder = DoubleCoder.of();
-
-  private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
-  private BeamSqlRowCoder(){}
-
-  public static BeamSqlRowCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(BeamSQLRow value, OutputStream outStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    recordTypeCoder.encode(value.getDataType(), outStream, context);
-    listCoder.encode(value.getNullFields(), outStream, context);
-
-    Context nested = context.nested();
-
-    for (int idx = 0; idx < value.size(); ++idx) {
-      if(value.getNullFields().contains(idx)){
-        continue;
-      }
-
-      switch (value.getDataType().getFieldsType().get(idx)) {
-      case INTEGER:
-      case SMALLINT:
-      case TINYINT:
-        intCoder.encode(value.getInteger(idx), outStream, nested);
-        break;
-      case DOUBLE:
-      case FLOAT:
-        doubleCoder.encode(value.getDouble(idx), outStream, nested);
-        break;
-      case BIGINT:
-        longCoder.encode(value.getLong(idx), outStream, nested);
-        break;
-      case VARCHAR:
-        stringCoder.encode(value.getString(idx), outStream, nested);
-        break;
-      case TIME:
-      case TIMESTAMP:
-        longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
-        break;
-
-      default:
-        throw new UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
-      }
-    }
-  }
-
-  @Override
-  public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
-      throws CoderException, IOException {
-    BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
-    List<Integer> nullFields = listCoder.decode(inStream, context);
-
-    BeamSQLRow record = new BeamSQLRow(type);
-    record.setNullFields(nullFields);
-
-    for (int idx = 0; idx < type.size(); ++idx) {
-      if(nullFields.contains(idx)){
-        continue;
-      }
-
-      switch (type.getFieldsType().get(idx)) {
-      case INTEGER:
-      case SMALLINT:
-      case TINYINT:
-        record.addField(idx, intCoder.decode(inStream, context));
-        break;
-      case DOUBLE:
-      case FLOAT:
-        record.addField(idx, doubleCoder.decode(inStream, context));
-        break;
-      case BIGINT:
-        record.addField(idx, longCoder.decode(inStream, context));
-        break;
-      case VARCHAR:
-        record.addField(idx, stringCoder.decode(inStream, context));
-        break;
-      case TIME:
-      case TIMESTAMP:
-        record.addField(idx, new Date(longCoder.decode(inStream, context)));
-        break;
-
-      default:
-        throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
-      }
-    }
-
-    return record;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public void verifyDeterministic()
-      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
deleted file mode 100644
index 6240426..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/InvalidFieldException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-public class InvalidFieldException extends RuntimeException {
-
-  public InvalidFieldException() {
-    super();
-  }
-
-  public InvalidFieldException(String message) {
-    super(message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
deleted file mode 100644
index 9a2235e..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/UnsupportedDataTypeException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema;
-
-import org.apache.calcite.sql.type.SqlTypeName;
-
-public class UnsupportedDataTypeException extends RuntimeException {
-
-  public UnsupportedDataTypeException(SqlTypeName unsupportedType){
-    super(String.format("Not support data type [%s]", unsupportedType));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
deleted file mode 100644
index 2570763..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema.kafka;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Kafka topic that saves records as CSV format.
- *
- */
-public class BeamKafkaCSVTable extends BeamKafkaTable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = 4754022536543333984L;
-
-  public static final String DELIMITER = ",";
-  private static final Logger LOG = LoggerFactory.getLogger(BeamKafkaCSVTable.class);
-
-  public BeamKafkaCSVTable(RelProtoDataType protoRowType, String bootstrapServers,
-      List<String> topics) {
-    super(protoRowType, bootstrapServers, topics);
-  }
-
-  @Override
-  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
-      getPTransformForInput() {
-    return new CsvRecorderDecoder(beamSqlRecordType);
-  }
-
-  @Override
-  public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
-      getPTransformForOutput() {
-    return new CsvRecorderEncoder(beamSqlRecordType);
-  }
-
-  /**
-   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}.
-   *
-   */
-  public static class CsvRecorderDecoder
-      extends PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>> {
-    private BeamSQLRecordType recordType;
-
-    public CsvRecorderDecoder(BeamSQLRecordType recordType) {
-      this.recordType = recordType;
-    }
-
-    @Override
-    public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> input) {
-      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, BeamSQLRow>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          String rowInString = new String(c.element().getValue());
-          String[] parts = rowInString.split(BeamKafkaCSVTable.DELIMITER);
-          if (parts.length != recordType.size()) {
-            LOG.error(String.format("invalid record: ", rowInString));
-          } else {
-            BeamSQLRow sourceRecord = new BeamSQLRow(recordType);
-            for (int idx = 0; idx < parts.length; ++idx) {
-              sourceRecord.addField(idx, parts[idx]);
-            }
-            c.output(sourceRecord);
-          }
-        }
-      }));
-    }
-  }
-
-  /**
-   * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}.
-   *
-   */
-  public static class CsvRecorderEncoder
-      extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>> {
-    private BeamSQLRecordType recordType;
-
-    public CsvRecorderEncoder(BeamSQLRecordType recordType) {
-      this.recordType = recordType;
-    }
-
-    @Override
-    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> input) {
-      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, KV<byte[], byte[]>>() {
-        @ProcessElement
-        public void processElement(ProcessContext c) {
-          BeamSQLRow in = c.element();
-          StringBuffer sb = new StringBuffer();
-          for (int idx = 0; idx < in.size(); ++idx) {
-            sb.append(DELIMITER);
-            sb.append(in.getFieldValue(idx).toString());
-          }
-          c.output(KV.of(new byte[] {}, sb.substring(1).getBytes()));
-        }
-      }));
-
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
deleted file mode 100644
index 482383b..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.schema.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.io.kafka.KafkaIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.beam.dsls.sql.schema.BaseBeamTable;
-import org.beam.dsls.sql.schema.BeamIOType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}.
- *
- */
-public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -634715473399906527L;
-
-  private String bootstrapServers;
-  private List<String> topics;
-  private Map<String, Object> configUpdates;
-
-  protected BeamKafkaTable(RelProtoDataType protoRowType) {
-    super(protoRowType);
-  }
-
-  public BeamKafkaTable(RelProtoDataType protoRowType, String bootstrapServers,
-      List<String> topics) {
-    super(protoRowType);
-    this.bootstrapServers = bootstrapServers;
-    this.topics = topics;
-  }
-
-  public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
-    this.configUpdates = configUpdates;
-    return this;
-  }
-
-  @Override
-  public BeamIOType getSourceType() {
-    return BeamIOType.UNBOUNDED;
-  }
-
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
-      getPTransformForInput();
-
-  public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
-      getPTransformForOutput();
-
-  @Override
-  public PTransform<? super PBegin, PCollection<BeamSQLRow>> buildIOReader() {
-    return new PTransform<PBegin, PCollection<BeamSQLRow>>() {
-
-      @Override
-      public PCollection<BeamSQLRow> expand(PBegin input) {
-        return input.apply("read",
-            KafkaIO.<byte[], byte[]>read().withBootstrapServers(bootstrapServers).withTopics(topics)
-                .updateConsumerProperties(configUpdates).withKeyCoder(ByteArrayCoder.of())
-                .withValueCoder(ByteArrayCoder.of()).withoutMetadata())
-            .apply("in_format", getPTransformForInput());
-
-      }
-    };
-  }
-
-  @Override
-  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
-    checkArgument(topics != null && topics.size() == 1,
-        "Only one topic can be acceptable as output.");
-
-    return new PTransform<PCollection<BeamSQLRow>, PDone>() {
-      @Override
-      public PDone expand(PCollection<BeamSQLRow> input) {
-        return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
-            KafkaIO.<byte[], byte[]>write().withBootstrapServers(bootstrapServers)
-                .withTopic(topics.get(0)).withKeyCoder(ByteArrayCoder.of())
-                .withValueCoder(ByteArrayCoder.of()));
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
deleted file mode 100644
index 822fce7..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/kafka/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * table schema for KafkaIO.
- */
-package org.beam.dsls.sql.schema.kafka;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
deleted file mode 100644
index ef9cc7d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/schema/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * define table schema, to map with Beam IO components.
- *
- */
-package org.beam.dsls.sql.schema;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
deleted file mode 100644
index 06db280..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLFilterFn.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.rel.BeamFilterRel;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
- *
- */
-public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1256111753670606705L;
-
-  private String stepName;
-  private BeamSQLExpressionExecutor executor;
-
-  public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    BeamSQLRow in = c.element();
-
-    List<Object> result = executor.execute(in);
-
-    if ((Boolean) result.get(0)) {
-      c.output(in);
-    }
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
deleted file mode 100644
index 1014c0d..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * A test PTransform to display output in console.
- *
- */
-public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1256111753670606705L;
-
-  private String stepName;
-
-  public BeamSQLOutputToConsoleFn(String stepName) {
-    super();
-    this.stepName = stepName;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    System.out.println("Output: " + c.element().getDataValues());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
deleted file mode 100644
index 12061d2..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.beam.dsls.sql.rel.BeamProjectRel;
-import org.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- *
- * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
- *
- */
-public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1046605249999014608L;
-  private String stepName;
-  private BeamSQLExpressionExecutor executor;
-  private BeamSQLRecordType outputRecordType;
-
-  public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
-      BeamSQLRecordType outputRecordType) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-    this.outputRecordType = outputRecordType;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    List<Object> results = executor.execute(c.element());
-
-    BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
-    for (int idx = 0; idx < results.size(); ++idx) {
-      outRow.addField(idx, results.get(idx));
-    }
-
-    c.output(outRow);
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java b/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
deleted file mode 100644
index 2607abf..0000000
--- a/dsls/sql/src/main/java/org/beam/dsls/sql/transform/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL pipeline.
- */
-package org.beam.dsls.sql.transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/529bc9d9/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
new file mode 100644
index 0000000..733b056
--- /dev/null
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.BeforeClass;
+
+/**
+ * prepare {@code BeamSqlRunner} for test.
+ *
+ */
+public class BasePlanner {
+  public static BeamSqlRunner runner = new BeamSqlRunner();
+
+  @BeforeClass
+  public static void prepare() {
+    runner.addTable("ORDER_DETAILS", getTable());
+    runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    runner.addTable("SUB_ORDER_RAM", getTable());
+  }
+
+  private static BaseBeamTable getTable() {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+      }
+    };
+
+    return new MockedBeamSQLTable(protoRowType);
+  }
+
+  public static BaseBeamTable getTable(String bootstrapServer, String topic) {
+    final RelProtoDataType protoRowType = new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a0) {
+        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+      }
+    };
+
+    Map<String, Object> consumerPara = new HashMap<String, Object>();
+    consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+
+    return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
+        .updateConsumerProperties(consumerPara);
+  }
+}