You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/05 23:34:52 UTC

[1/3] beam git commit: DSL interface for Beam SQL

Repository: beam
Updated Branches:
  refs/heads/DSL_SQL dedabff1f -> 9395fbb3c


http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index d4e1db2..4795b2c 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.beam.dsls.sql.rel;
 
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.PAssert;
@@ -36,7 +37,6 @@ import org.junit.Test;
 public class BeamValuesRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
   private static MockedBeamSQLTable stringTable = MockedBeamSQLTable
       .of(SqlTypeName.VARCHAR, "name",
           SqlTypeName.VARCHAR, "description");
@@ -49,7 +49,7 @@ public class BeamValuesRelTest {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.VARCHAR, "name",
         SqlTypeName.VARCHAR, "description",
@@ -61,7 +61,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.INTEGER, "c0",
         SqlTypeName.INTEGER, "c1",
@@ -73,7 +73,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.INTEGER, "EXPR$0",
         SqlTypeName.CHAR, "EXPR$1",
@@ -84,8 +84,8 @@ public class BeamValuesRelTest {
 
   @BeforeClass
   public static void prepareClass() {
-    runner.addTableMetadata("string_table", stringTable);
-    runner.addTableMetadata("int_table", intTable);
+    BeamSqlEnv.registerTable("string_table", stringTable);
+    BeamSqlEnv.registerTable("int_table", intTable);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
index 6f24e2a..cb268bf 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.dsls.sql.schema;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.BasePlanner;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -27,17 +29,19 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Test case for BeamPCollectionTable.
  */
 public class BeamPCollectionTableTest extends BasePlanner{
-  public static TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
 
-  @BeforeClass
-  public static void prepareTable(){
+  @Before
+  public void prepareTable(){
     RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
@@ -51,14 +55,16 @@ public class BeamPCollectionTableTest extends BasePlanner{
     row.addField(0, 1);
     row.addField(1, "hello world.");
     PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row));
-    runner.addTableMetadata("COLLECTION_TABLE",
+    BeamSqlEnv.registerTable("COLLECTION_TABLE",
         new BeamPCollectionTable(inputStream, protoRowType));
   }
 
   @Test
   public void testSelectFromPCollectionTable() throws Exception{
     String sql = "select c1, c2 from COLLECTION_TABLE";
-    runner.executionPlan(sql);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
+
+    pipeline.run().waitUntilFinish();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index bc6343b..985b667 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -74,7 +74,7 @@ public class BeamSqlRowCoderTest {
     row.addField("col_timestamp", new Date());
 
 
-    BeamSqlRowCoder coder = BeamSqlRowCoder.of();
+    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
     CoderProperties.coderDecodeEncodeEqual(coder, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index f174b9c..dadd53b 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -23,11 +23,11 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.sql.SqlKind;
@@ -62,8 +61,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   public TestPipeline p = TestPipeline.create();
 
   private List<AggregateCall> aggCalls;
-  private BeamSQLRecordType keyType = initTypeOfSqlRow(
-      Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+
+  private BeamSQLRecordType keyType;
+  private BeamSQLRecordType aggPartType;
+  private BeamSQLRecordType outputType;
+
+  private BeamSqlRowCoder inRecordCoder;
+  private BeamSqlRowCoder keyCoder;
+  private BeamSqlRowCoder aggCoder;
+  private BeamSqlRowCoder outRecordCoder;
 
   /**
    * This step equals to below query.
@@ -97,21 +103,25 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
     //1. extract fields in group-by key part
     PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = input.apply("exGroupBy",
         WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))));
+            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0))))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, inRecordCoder));
 
     //2. apply a GroupByKey.
     PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
+        .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
+            IterableCoder.<BeamSQLRow>of(inRecordCoder)));
 
     //3. run aggregation functions
     PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation",
         Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
-            new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType)));
+            new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType)))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
 
     //4. flat KV to a single record
     PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            BeamSQLRecordType.from(prepareFinalRowType()), aggCalls)));
+        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls)));
+    mergedStream.setCoder(outRecordCoder);
 
     //assert function BeamAggregationTransform.AggregationGroupByKeyFn
     PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
@@ -126,17 +136,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
 }
 
   private void setupEnvironment() {
-    regiesterCoder();
     prepareAggregationCalls();
-  }
-
-  /**
-   * Add Coders in BeamSQL.
-   */
-  private void regiesterCoder() {
-    CoderRegistry cr = p.getCoderRegistry();
-    cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
-    cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+    prepareTypeAndCoder();
   }
 
   /**
@@ -327,26 +328,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   }
 
   /**
-   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+   * Coders used in aggregation steps.
    */
-  private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() {
-    return Arrays.asList(
-        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-            inputRows.get(0)),
-        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
-            inputRows.get(1)),
-        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
-            inputRows.get(2)),
-        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
-            inputRows.get(3)));
-  }
+  private void prepareTypeAndCoder() {
+    inRecordCoder = new BeamSqlRowCoder(inputRowType);
 
-  /**
-   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
-   */
-  private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn()
-      throws ParseException {
-    BeamSQLRecordType aggPartType = initTypeOfSqlRow(
+    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+    keyCoder = new BeamSqlRowCoder(keyType);
+
+    aggPartType = initTypeOfSqlRow(
         Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
 
             KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT),
@@ -369,6 +359,32 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
             KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER),
             KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER)
             ));
+    aggCoder = new BeamSqlRowCoder(aggPartType);
+
+    outputType = prepareFinalRowType();
+    outRecordCoder = new BeamSqlRowCoder(outputType);
+  }
+
+  /**
+   * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}.
+   */
+  private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() {
+    return Arrays.asList(
+        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+            inputRows.get(0)),
+        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+            inputRows.get(1)),
+        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+            inputRows.get(2)),
+        KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+            inputRows.get(3)));
+  }
+
+  /**
+   * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}.
+   */
+  private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn()
+      throws ParseException {
     return Arrays.asList(
             KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
                 new BeamSQLRow(aggPartType, Arrays.<Object>asList(
@@ -387,7 +403,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
   /**
    * Row type of final output row.
    */
-  private RelDataType prepareFinalRowType() {
+  private BeamSQLRecordType prepareFinalRowType() {
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     List<KV<String, SqlTypeName>> columnMetadata =
         Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT),
@@ -415,14 +431,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return builder.build();
+    return BeamSQLRecordType.from(builder.build());
   }
 
   /**
    * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}.
    */
   private BeamSQLRow prepareResultOfMergeAggregationRecord() throws ParseException {
-    return new BeamSQLRow(BeamSQLRecordType.from(prepareFinalRowType()), Arrays.<Object>asList(
+    return new BeamSQLRow(outputType, Arrays.<Object>asList(
         1, 4L,
         10000L, 2500L, 4000L, 1000L,
         (short) 10, (short) 2, (short) 4, (short) 1,


[3/3] beam git commit: [BEAM-2010] expose programming interface

Posted by lc...@apache.org.
[BEAM-2010] expose programming interface

This closes #3250


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9395fbb3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9395fbb3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9395fbb3

Branch: refs/heads/DSL_SQL
Commit: 9395fbb3c8d747b53244fc54a582acfd3e106d76
Parents: dedabff 680a543
Author: Luke Cwik <lc...@google.com>
Authored: Mon Jun 5 16:34:41 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 16:34:41 2017 -0700

----------------------------------------------------------------------
 .../beam/dsls/sql/BeamSQLEnvironment.java       | 142 ----------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 166 +++++++++++++++++++
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    |  70 ++++++++
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    |  63 +++++++
 .../beam/dsls/sql/example/BeamSqlExample.java   | 106 +++++-------
 .../dsls/sql/planner/BeamPipelineCreator.java   |  17 +-
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  21 ++-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  57 +++++--
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  12 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |  17 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  25 ++-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  11 +-
 .../apache/beam/dsls/sql/rel/BeamRelNode.java   |  11 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  19 ++-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  10 +-
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |  22 +++
 .../dsls/sql/schema/BeamSQLRecordTypeCoder.java |  87 ----------
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  34 ++--
 .../beam/dsls/sql/planner/BasePlanner.java      |  10 +-
 .../sql/planner/BeamGroupByExplainTest.java     |  18 +-
 .../sql/planner/BeamGroupByPipelineTest.java    |  18 +-
 .../sql/planner/BeamInvalidGroupByTest.java     |   5 +-
 .../BeamPlannerAggregationSubmitTest.java       |  12 +-
 .../sql/planner/BeamPlannerExplainTest.java     |   7 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |   3 +-
 .../beam/dsls/sql/rel/BeamSortRelTest.java      |  28 ++--
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    |  14 +-
 .../sql/schema/BeamPCollectionTableTest.java    |  18 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   2 +-
 .../transform/BeamAggregationTransformTest.java |  98 ++++++-----
 30 files changed, 628 insertions(+), 495 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: DSL interface for Beam SQL

Posted by lc...@apache.org.
DSL interface for Beam SQL


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/680a543d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/680a543d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/680a543d

Branch: refs/heads/DSL_SQL
Commit: 680a543d20d0d8bca25aced0a5f02c38529babf2
Parents: dedabff
Author: mingmxu <mi...@ebay.com>
Authored: Fri May 26 22:07:03 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 5 16:22:08 2017 -0700

----------------------------------------------------------------------
 .../beam/dsls/sql/BeamSQLEnvironment.java       | 142 ----------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  | 166 +++++++++++++++++++
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    |  70 ++++++++
 .../org/apache/beam/dsls/sql/BeamSqlEnv.java    |  63 +++++++
 .../beam/dsls/sql/example/BeamSqlExample.java   | 106 +++++-------
 .../dsls/sql/planner/BeamPipelineCreator.java   |  17 +-
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  21 ++-
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  57 +++++--
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  12 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |  17 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  25 ++-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  11 +-
 .../apache/beam/dsls/sql/rel/BeamRelNode.java   |  11 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  19 ++-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  10 +-
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |  22 +++
 .../dsls/sql/schema/BeamSQLRecordTypeCoder.java |  87 ----------
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  34 ++--
 .../beam/dsls/sql/planner/BasePlanner.java      |  10 +-
 .../sql/planner/BeamGroupByExplainTest.java     |  18 +-
 .../sql/planner/BeamGroupByPipelineTest.java    |  18 +-
 .../sql/planner/BeamInvalidGroupByTest.java     |   5 +-
 .../BeamPlannerAggregationSubmitTest.java       |  12 +-
 .../sql/planner/BeamPlannerExplainTest.java     |   7 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |   3 +-
 .../beam/dsls/sql/rel/BeamSortRelTest.java      |  28 ++--
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    |  14 +-
 .../sql/schema/BeamPCollectionTableTest.java    |  18 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   2 +-
 .../transform/BeamAggregationTransformTest.java |  98 ++++++-----
 30 files changed, 628 insertions(+), 495 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
deleted file mode 100644
index cdb25f5..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql;
-
-import java.io.Serializable;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@code BeamSQLEnvironment} is the integrated environment of BeamSQL.
- * It provides runtime context to execute SQL queries as Beam pipeline,
- * including table metadata, SQL engine and a Beam pipeline translator.
- *
- * <h1>1. BeamSQL as DSL</h1>
- * <em>BeamSQL as DSL</em> enables developers to embed SQL queries when writing a Beam pipeline.
- * A typical pipeline with BeamSQL DSL is:
- * <pre>
- *{@code
-PipelineOptions options =  PipelineOptionsFactory...
-Pipeline pipeline = Pipeline.create(options);
-
-//prepare environment of BeamSQL
-BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create();
-//register table metadata
-sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata);
-//register UDF
-sqlEnv.registerUDF(String functionName, Method udfMethod);
-
-
-//explain a SQL statement, SELECT only, and return as a PCollection;
-PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(pipeline, String sqlStatement);
-//A PCollection explained by BeamSQL can be converted into a table, and apply queries on it;
-sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream);
-
-//apply more queries, even based on phase1Stream
-
-pipeline.run().waitUntilFinish();
- * }
- * </pre>
- *
- * <h1>2. BeamSQL as CLI</h1>
- * This feature is on planning, and not ready yet.
- *
- */
-public class BeamSQLEnvironment implements Serializable {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamSQLEnvironment.class);
-
-  public static final BeamSQLEnvironment INSTANCE = new BeamSQLEnvironment();
-
-  private SchemaPlus schema = Frameworks.createRootSchema(true);
-  private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
-
-  private BeamSQLEnvironment() {
-    //disable assertions in Calcite.
-    ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
-  }
-
-  /**
-   * Return an instance of {@code BeamSQLEnvironment}.
-   */
-  public static BeamSQLEnvironment create(){
-    return INSTANCE;
-  }
-
-  /**
-   * Add a schema.
-   *
-   */
-  public void addSchema(String schemaName, Schema scheme) {
-    schema.add(schemaName, schema);
-  }
-
-  /**
-   * add a {@link BaseBeamTable} to schema repository.
-   */
-  public void addTableMetadata(String tableName, BaseBeamTable tableMetadata) {
-    schema.add(tableName, tableMetadata);
-    planner.getSourceTables().put(tableName, tableMetadata);
-  }
-
-  /* Add a UDF function.
-   *
-   * <p>There're two requirements for function {@code methodName}:<br>
-   * 1. It must be a STATIC method;<br>
-   * 2. For a primitive parameter, use its wrapper class and handle NULL properly;
-   */
-  public void addUDFFunction(String functionName, Class<?> className, String methodName){
-    schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
-  }
-
-  /**
-   * explain and display the execution plan.
-   */
-  public String executionPlan(String sqlString)
-    throws ValidationException, RelConversionException, SqlParseException {
-    BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
-    String beamPlan = RelOptUtil.toString(exeTree);
-    LOG.info(String.format("beamPlan>\n%s", beamPlan));
-    return beamPlan;
-  }
-
-  /**
-   * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
-   * which is linked with the given {@code pipeline}. The final output stream is returned as
-   * {@code PCollection} so more operations can be applied.
-   */
-  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
-      throws Exception{
-    PCollection<BeamSQLRow> resultStream = planner.compileBeamPipeline(sqlStatement, basePipeline);
-    return resultStream;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
new file mode 100644
index 0000000..8c2c5ad
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+TableSchema tableASchema = ...;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+    .apply(BeamSql.fromTextRow(tableASchema));
+TableSchema tableBSchema = ...;
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+    .apply(BeamSql.fromTextRow(tableBSchema));
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from TABLE_A";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(BeamSql.simpleQuery(sql1))
+        .withUdf("MY_FUNC", myFunc);
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+    new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+                .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+    .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(BeamSql.toTextRow()).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+   * all the input tables and results in a {@code PCollection<BeamSQLRow>} representing the output
+   * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+   * {@code PCollection<BeamSQLRow>}, each representing an input table.
+   *
+   * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+   * referenced within the query.
+   */
+  public static PTransform<PCollectionTuple, PCollection<BeamSQLRow>> query(String sqlQuery) {
+    return new QueryTransform(sqlQuery);
+
+  }
+
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>This is a simplified form of {@link #query(String)} where the query must reference
+   * a single input table.
+   */
+  public static PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>>
+  simpleQuery(String sqlQuery) throws Exception {
+    return new SimpleQueryTransform(sqlQuery);
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query.
+   */
+  public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSQLRow>> {
+    private String sqlQuery;
+    public QueryTransform(String sqlQuery) {
+      this.sqlQuery = sqlQuery;
+    }
+
+    @Override
+    public PCollection<BeamSQLRow> expand(PCollectionTuple input) {
+      BeamRelNode beamRelNode = null;
+      try {
+        beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
+      } catch (ValidationException | RelConversionException | SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+
+      try {
+        return beamRelNode.buildBeamPipeline(input);
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query referencing
+   * a single table.
+   */
+  public static class SimpleQueryTransform
+      extends PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> {
+    private String sqlQuery;
+    public SimpleQueryTransform(String sqlQuery) {
+      this.sqlQuery = sqlQuery;
+    }
+
+    public SimpleQueryTransform withUdf(String udfName){
+      throw new BeamSqlUnsupportedException("Pending for UDF support");
+    }
+
+    @Override
+    public PCollection<BeamSQLRow> expand(PCollection<BeamSQLRow> input) {
+      SqlNode sqlNode;
+      try {
+        sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery);
+        BeamSqlEnv.planner.getPlanner().close();
+      } catch (SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+      BeamSqlRowCoder inputCoder = (BeamSqlRowCoder) input.getCoder();
+
+      if (sqlNode instanceof SqlSelect) {
+        SqlSelect select = (SqlSelect) sqlNode;
+        String tableName = select.getFrom().toString();
+        BeamSqlEnv.registerTable(tableName,
+            new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
+        return PCollectionTuple.of(new TupleTag<BeamSQLRow>(tableName), input)
+            .apply(BeamSql.query(sqlQuery));
+      } else {
+        throw new BeamSqlUnsupportedException(sqlNode.toString());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
new file mode 100644
index 0000000..6591589
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+
+  /**
+   * Returns a human readable representation of the query execution plan.
+   */
+  public static String explainQuery(String sqlString)
+      throws ValidationException, RelConversionException, SqlParseException {
+    BeamRelNode exeTree = BeamSqlEnv.planner.convertToBeamRel(sqlString);
+    String beamPlan = RelOptUtil.toString(exeTree);
+    return beamPlan;
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement) throws Exception{
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+        .as(PipelineOptions.class); // FlinkPipelineOptions.class
+    options.setJobName("BeamPlanCreator");
+    Pipeline pipeline = Pipeline.create(options);
+
+    return compilePipeline(sqlStatement, pipeline);
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
+      throws Exception{
+    PCollection<BeamSQLRow> resultStream =
+        BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline);
+    return resultStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..af6c007
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv {
+  public static SchemaPlus schema;
+  public static BeamQueryPlanner planner;
+
+  static {
+    schema = Frameworks.createRootSchema(true);
+    planner = new BeamQueryPlanner(schema);
+  }
+
+  /**
+   * Register a UDF function which can be used in SQL expression.
+   */
+  public static void registerUdf(String functionName, Class<?> clazz, String methodName) {
+    schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
+  }
+
+  /**
+   * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+   *
+   */
+  public static void registerTable(String tableName, BaseBeamTable table) {
+    schema.add(tableName, table);
+    planner.getSourceTables().put(tableName, table);
+  }
+
+  /**
+   * Find {@link BaseBeamTable} by table name.
+   */
+  public static BaseBeamTable findTable(String tableName){
+    return planner.getSourceTables().get(tableName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 2695944..6a1b81d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -17,93 +17,61 @@
  */
 package org.apache.beam.dsls.sql.example;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.BeamSql;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is one quick example.
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
  *
- * <p>Before start, follow https://kafka.apache.org/quickstart to setup a Kafka
- * cluster locally, and run below commands to create required Kafka topics:
- * <pre>
- * <code>
- * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
- *   --partitions 1 --topic orders
- * bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 \
- *   --partitions 1 --topic sub_orders
- * </code>
- * </pre>
- * After run the application, produce several test records:
- * <pre>
- * <code>
- * bin/kafka-console-producer.sh --broker-list localhost:9092 --topic orders
- * invalid,record
- * 123445,0,100,3413423
- * 234123,3,232,3451231234
- * 234234,0,5,1234123
- * 345234,0,345234.345,3423
- * </code>
- * </pre>
- * Meanwhile, open another console to see the output:
- * <pre>
- * <code>
- * bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sub_orders
- * **Expected :
- * 123445,0,100.0
- * 345234,0,345234.345
- * </code>
- * </pre>
  */
-public class BeamSqlExample implements Serializable {
+public class BeamSqlExample {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlExample.class);
 
   public static void main(String[] args) throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
-        .as(PipelineOptions.class); // FlinkPipelineOptions.class
-    options.setJobName("BeamSqlExample");
-    Pipeline pipeline = Pipeline.create(options);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Pipeline p = Pipeline.create(options);
 
-    BeamSQLEnvironment runner = BeamSQLEnvironment.create();
-    runner.addTableMetadata("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));
-    runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    //define the input row format
+    BeamSQLRecordType type = new BeamSQLRecordType();
+    type.addField("c1", SqlTypeName.INTEGER);
+    type.addField("c2", SqlTypeName.VARCHAR);
+    type.addField("c3", SqlTypeName.DOUBLE);
+    BeamSQLRow row = new BeamSQLRow(type);
+    row.addField(0, 1);
+    row.addField(1, "row");
+    row.addField(2, 1.0);
 
-    // case 2: insert into <table>(<fields>) select STREAM <fields> from
-    // <table> from <clause>
-    String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
-        + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
+    //create a source PCollection with Create.of();
+    PCollection<BeamSQLRow> inputTable = PBegin.in(p).apply(Create.of(row)
+        .withCoder(new BeamSqlRowCoder(type)));
 
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    //run a simple SQL query over input PCollection;
+    String sql = "select c2, c3 from TABLE_A where c1=1";
+    PCollection<BeamSQLRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
 
-    pipeline.run().waitUntilFinish();
-  }
-
-  public static BaseBeamTable getTable(String bootstrapServer, String topic) {
-    final RelProtoDataType protoRowType = new RelProtoDataType() {
+    //log out the output record;
+    outputStream.apply("log_result",
+        MapElements.<BeamSQLRow, Void>via(new SimpleFunction<BeamSQLRow, Void>() {
       @Override
-      public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", SqlTypeName.TIMESTAMP).build();
+      public Void apply(BeamSQLRow input) {
+        LOG.info(input.valueInString());
+        return null;
       }
-    };
-
-    Map<String, Object> consumerPara = new HashMap<String, Object>();
-    consumerPara.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+    }));
 
-    return new BeamKafkaCSVTable(protoRowType, bootstrapServer, Arrays.asList(topic))
-        .updateConsumerProperties(consumerPara);
+    p.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 1f3ba58..abdc66c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -18,16 +18,9 @@
 package org.apache.beam.dsls.sql.planner;
 
 import java.util.Map;
-
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
  * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam
@@ -37,19 +30,13 @@ import org.apache.beam.sdk.options.PipelineOptions;
 public class BeamPipelineCreator {
   private Map<String, BaseBeamTable> sourceTables;
 
-  private PipelineOptions options;
-
   private Pipeline pipeline;
 
   private boolean hasPersistent = false;
 
-  public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline pipeline) {
+  public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline basePipeline) {
     this.sourceTables = sourceTables;
-    this.pipeline = pipeline;
-
-    CoderRegistry cr = pipeline.getCoderRegistry();
-    cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
-    cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+    this.pipeline = basePipeline;
   }
 
   public Map<String, BaseBeamTable> getSourceTables() {

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 0a7407c..6f148d6 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -22,13 +22,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -95,17 +95,24 @@ public class BeamQueryPlanner {
   }
 
   /**
+   * Parse input SQL query, and return a {@link SqlNode} as grammar tree.
+   */
+  public SqlNode parseQuery(String sqlQuery) throws SqlParseException{
+    return planner.parse(sqlQuery);
+  }
+
+  /**
    * {@code compileBeamPipeline} translate a SQL statement to executed as Beam data flow,
    * which is linked with the given {@code pipeline}. The final output stream is returned as
    * {@code PCollection} so more operations can be applied.
    */
-  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline pipeline)
+  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
       throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
-    BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, pipeline);
-
-    return relNode.buildBeamPipeline(planCreator);
+    BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, basePipeline);
+    // the input PCollectionTuple is empty, and be rebuilt in BeamIOSourceRel.
+    return relNode.buildBeamPipeline(PCollectionTuple.empty(basePipeline));
   }
 
   /**
@@ -155,4 +162,8 @@ public class BeamQueryPlanner {
     return sourceTables;
   }
 
+  public Planner getPlanner() {
+    return planner;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 3e147aa..6914883 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -18,11 +18,13 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -34,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
@@ -41,6 +44,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 import org.joda.time.Duration;
@@ -67,16 +71,17 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
       throws Exception {
     RelNode input = getInput();
     String stageName = BeamSQLRelUtils.getStageName(this);
 
     PCollection<BeamSQLRow> upstream =
-        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
     if (windowFieldIdx != -1) {
       upstream = upstream.apply("assignEventTimestamp", WithTimestamps
-          .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)));
+          .<BeamSQLRow>of(new BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+          .setCoder(upstream.getCoder());
     }
 
     PCollection<BeamSQLRow> windowStream = upstream.apply("window",
@@ -85,29 +90,59 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode {
         .withAllowedLateness(allowedLatence)
         .accumulatingFiredPanes());
 
-    //1. extract fields in group-by key part
+    BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
     PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = windowStream.apply("exGroupBy",
         WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(windowFieldIdx, groupSet)));
+            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
+                windowFieldIdx, groupSet)))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, upstream.getCoder()));
 
-    //2. apply a GroupByKey.
     PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
+        .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
+            IterableCoder.<BeamSQLRow>of(upstream.getCoder())));
 
-    //3. run aggregation functions
+    BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
     PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation",
         Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
             new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
-                BeamSQLRecordType.from(input.getRowType()))));
+                BeamSQLRecordType.from(input.getRowType()))))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
 
-    //4. flat KV to a single record
     PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
             BeamSQLRecordType.from(getRowType()), getAggCallList())));
+    mergedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
 
     return mergedStream;
   }
 
+  /**
+   * Type of sub-rowrecord used as Group-By keys.
+   */
+  private BeamSQLRecordType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSQLRecordType inputRecordType = BeamSQLRecordType.from(relDataType);
+    BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+    for (int i : groupSet.asList()) {
+      if (i != windowFieldIdx) {
+        typeOfKey.addField(inputRecordType.getFieldsName().get(i),
+            inputRecordType.getFieldsType().get(i));
+      }
+    }
+    return typeOfKey;
+  }
+
+  /**
+   * Type of sub-rowrecord, that represents the list of aggregation fields.
+   */
+  private BeamSQLRecordType exAggFieldsSchema() {
+    BeamSQLRecordType typeOfAggFields = new BeamSQLRecordType();
+    for (AggregateCall ac : getAggCallList()) {
+      typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName());
+    }
+    return typeOfAggFields;
+  }
+
   @Override
   public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator
       , ImmutableBitSet groupSet,

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index f2c1bba..3387071 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -19,12 +19,14 @@ package org.apache.beam.dsls.sql.rel;
 
 import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -48,20 +50,20 @@ public class BeamFilterRel extends Filter implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
       throws Exception {
 
     RelNode input = getInput();
-
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(planCreator);
+    PCollection<BeamSQLRow> upstream =
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> filterStream = upstream.apply(stageName,
         ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
+    filterStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index bc94ab8..f821700 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -19,12 +19,13 @@ package org.apache.beam.dsls.sql.rel;
 
 import com.google.common.base.Joiner;
 import java.util.List;
-
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
@@ -52,22 +53,24 @@ public class BeamIOSinkRel extends TableModify implements BeamRelNode {
   }
 
   /**
-   * Note that {@code BeamIOSinkRel} returns the input PCollection.
+   * Note that {@code BeamIOSinkRel} returns the input PCollection,
+   * which is the persisted PCollection.
    */
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
       throws Exception {
+
     RelNode input = getInput();
     String stageName = BeamSQLRelUtils.getStageName(this);
 
     PCollection<BeamSQLRow> upstream =
-        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(planCreator);
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    BaseBeamTable targetTable = planCreator.getSourceTables().get(sourceName);
+    BaseBeamTable targetTable = BeamSqlEnv.findTable(sourceName);
 
-    upstream.apply(stageName, targetTable.buildIOWriter());
+    PDone streamEnd = upstream.apply(stageName, targetTable.buildIOWriter());
 
     return upstream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index f4d5001..38de41e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -18,12 +18,13 @@
 package org.apache.beam.dsls.sql.rel;
 
 import com.google.common.base.Joiner;
-
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
@@ -40,18 +41,24 @@ public class BeamIOSourceRel extends TableScan implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
       throws Exception {
 
-    String sourceName = Joiner.on('.').join(getTable().getQualifiedName()).replace(".(STREAM)", "");
-
-    BaseBeamTable sourceTable = planCreator.getSourceTables().get(sourceName);
+    String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> sourceStream = sourceTable.buildIOReader(planCreator.getPipeline());
-
-    return sourceStream;
+    TupleTag<BeamSQLRow> sourceTupleTag = new TupleTag<BeamSQLRow>(sourceName);
+    if (inputPCollections.has(sourceTupleTag)) {
+      //choose PCollection from input PCollectionTuple if exists there.
+      PCollection<BeamSQLRow> sourceStream = inputPCollections
+          .get(new TupleTag<BeamSQLRow>(sourceName));
+      return sourceStream;
+    } else {
+      //If not, the source PColection is provided with BaseBeamTable.buildIOReader().
+      BaseBeamTable sourceTable = BeamSqlEnv.findTable(sourceName);
+      return sourceTable.buildIOReader(inputPCollections.getPipeline());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index 954868d..e2645f1 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -21,13 +21,14 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
 import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -60,21 +61,21 @@ public class BeamProjectRel extends Project implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator)
+  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
       throws Exception {
     RelNode input = getInput();
     String stageName = BeamSQLRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(planCreator);
+    PCollection<BeamSQLRow> upstream =
+        BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
     BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
 
     PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
         .of(new BeamSQLProjectFn(getRelTypeName(), executor, BeamSQLRecordType.from(rowType))));
+    projectStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
 
     return projectStream;
-
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index ff2b5b6..ed58090 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -20,18 +20,19 @@ package org.apache.beam.dsls.sql.rel;
 import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.rel.RelNode;
 
 /**
- * A new method {@link #buildBeamPipeline(BeamPipelineCreator)} is added, it's
+ * A new method {@link #buildBeamPipeline(PCollectionTuple)} is added, it's
  * called by {@link BeamPipelineCreator}.
- *
  */
 public interface BeamRelNode extends RelNode {
 
   /**
-   * {@code #buildBeamPipeline(BeamPipelineCreator)} applies a transform to upstream,
-   * and generate an output {@code PCollection}.
+   * A {@link BeamRelNode} is a recursive structure, the
+   * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
+   * algorithm.
    */
-  PCollection<BeamSQLRow> buildBeamPipeline(BeamPipelineCreator planCreator) throws Exception;
+  PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 3df2f34..06a4edf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -26,16 +26,19 @@ import java.util.Comparator;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
@@ -120,11 +123,11 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     }
   }
 
-  @Override public PCollection<BeamSQLRow> buildBeamPipeline(
-      BeamPipelineCreator planCreator) throws Exception {
+  @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+      throws Exception {
     RelNode input = getInput();
     PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
-        .buildBeamPipeline(planCreator);
+        .buildBeamPipeline(inputPCollections);
     Type windowType = upstream.getWindowingStrategy().getWindowFn()
         .getWindowTypeDescriptor().getType();
     if (!windowType.equals(GlobalWindow.class)) {
@@ -137,16 +140,20 @@ public class BeamSortRel extends Sort implements BeamRelNode {
     // first find the top (offset + count)
     PCollection<List<BeamSQLRow>> rawStream =
         upstream.apply("extractTopOffsetAndFetch",
-            Top.of(startIndex + count, comparator).withoutDefaults());
+            Top.of(startIndex + count, comparator).withoutDefaults())
+        .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
 
     // strip the `leading offset`
     if (startIndex > 0) {
       rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
-          new SubListFn<BeamSQLRow>(startIndex, startIndex + count)));
+          new SubListFn<BeamSQLRow>(startIndex, startIndex + count)))
+          .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
     }
 
     PCollection<BeamSQLRow> orderedStream = rawStream.apply(
         "flatten", Flatten.<BeamSQLRow>iterables());
+    orderedStream.setCoder(new BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+
     return orderedStream;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index 4fbe7ec..ea59906 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -23,13 +23,14 @@ import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
 import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.core.Values;
@@ -56,8 +57,8 @@ public class BeamValuesRel extends Values implements BeamRelNode {
 
   }
 
-  @Override public PCollection<BeamSQLRow> buildBeamPipeline(
-      BeamPipelineCreator planCreator) throws Exception {
+  @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple inputPCollections)
+      throws Exception {
     List<BeamSQLRow> rows = new ArrayList<>(tuples.size());
     String stageName = BeamSQLRelUtils.getStageName(this);
     if (tuples.isEmpty()) {
@@ -73,6 +74,7 @@ public class BeamValuesRel extends Values implements BeamRelNode {
       rows.add(row);
     }
 
-    return planCreator.getPipeline().apply(stageName, Create.of(rows));
+    return inputPCollections.getPipeline().apply(stageName, Create.of(rows))
+        .setCoder(new BeamSqlRowCoder(beamSQLRecordType));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
index 94531f0..e8fa82f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
@@ -21,7 +21,10 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -33,6 +36,9 @@ public class BeamSQLRecordType implements Serializable {
   private List<String> fieldsName = new ArrayList<>();
   private List<SqlTypeName> fieldsType = new ArrayList<>();
 
+  /**
+   * Generate from {@link RelDataType} which is used to create table.
+   */
   public static BeamSQLRecordType from(RelDataType tableInfo) {
     BeamSQLRecordType record = new BeamSQLRecordType();
     for (RelDataTypeField f : tableInfo.getFieldList()) {
@@ -47,6 +53,22 @@ public class BeamSQLRecordType implements Serializable {
     fieldsType.add(fieldType);
   }
 
+  /**
+   * Create an instance of {@link RelDataType} so it can be used to create a table.
+   */
+  public RelProtoDataType toRelDataType() {
+    return new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a) {
+        FieldInfoBuilder builder = a.builder();
+        for (int idx = 0; idx < fieldsName.size(); ++idx) {
+          builder.add(fieldsName.get(idx), fieldsType.get(idx));
+        }
+        return builder.build();
+      }
+    };
+  }
+
   public int size() {
     return fieldsName.size();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
deleted file mode 100644
index b88a195..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordTypeCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * A {@link Coder} for {@link BeamSQLRecordType}.
- *
- */
-public class BeamSQLRecordTypeCoder extends StandardCoder<BeamSQLRecordType> {
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final VarIntCoder intCoder = VarIntCoder.of();
-
-  private static final BeamSQLRecordTypeCoder INSTANCE = new BeamSQLRecordTypeCoder();
-  private BeamSQLRecordTypeCoder(){}
-
-  public static BeamSQLRecordTypeCoder of() {
-    return INSTANCE;
-  }
-
-  @Override
-  public void encode(BeamSQLRecordType value, OutputStream outStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    Context nested = context.nested();
-    intCoder.encode(value.size(), outStream, nested);
-    for (String fieldName : value.getFieldsName()) {
-      stringCoder.encode(fieldName, outStream, nested);
-    }
-    for (SqlTypeName fieldType : value.getFieldsType()) {
-      stringCoder.encode(fieldType.name(), outStream, nested);
-    }
-    //add a dummy field to indicate the end of record
-    intCoder.encode(value.size(), outStream, context);
-  }
-
-  @Override
-  public BeamSQLRecordType decode(InputStream inStream,
-      org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    BeamSQLRecordType typeRecord = new BeamSQLRecordType();
-    int size = intCoder.decode(inStream, context.nested());
-    for (int idx = 0; idx < size; ++idx) {
-      typeRecord.getFieldsName().add(stringCoder.decode(inStream, context.nested()));
-    }
-    for (int idx = 0; idx < size; ++idx) {
-      typeRecord.getFieldsType().add(
-          SqlTypeName.valueOf(stringCoder.decode(inStream, context.nested())));
-    }
-    intCoder.decode(inStream, context);
-    return typeRecord;
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
-  }
-
-  @Override
-  public void verifyDeterministic()
-      throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 0bfe467..f161d27 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -23,24 +23,22 @@ import java.io.OutputStream;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
-
 import org.apache.beam.sdk.coders.BigDecimalCoder;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
- * A {@link Coder} encodes {@link BeamSQLRow}.
- *
+ *  A {@link Coder} encodes {@link BeamSQLRow}.
  */
-public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
-  private static final BeamSQLRecordTypeCoder recordTypeCoder = BeamSQLRecordTypeCoder.of();
+public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
+  private BeamSQLRecordType tableSchema;
 
   private static final ListCoder<Integer> listCoder = ListCoder.of(BigEndianIntegerCoder.of());
 
@@ -51,17 +49,13 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
   private static final InstantCoder instantCoder = InstantCoder.of();
   private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
 
-  private static final BeamSqlRowCoder INSTANCE = new BeamSqlRowCoder();
-  private BeamSqlRowCoder(){}
-
-  public static BeamSqlRowCoder of() {
-    return INSTANCE;
+  public BeamSqlRowCoder(BeamSQLRecordType tableSchema) {
+    this.tableSchema = tableSchema;
   }
 
   @Override
   public void encode(BeamSQLRow value, OutputStream outStream,
       org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, IOException {
-    recordTypeCoder.encode(value.getDataType(), outStream, context.nested());
     listCoder.encode(value.getNullFields(), outStream, context.nested());
 
     for (int idx = 0; idx < value.size(); ++idx) {
@@ -115,18 +109,17 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
   @Override
   public BeamSQLRow decode(InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
       throws CoderException, IOException {
-    BeamSQLRecordType type = recordTypeCoder.decode(inStream, context.nested());
     List<Integer> nullFields = listCoder.decode(inStream, context.nested());
 
-    BeamSQLRow record = new BeamSQLRow(type);
+    BeamSQLRow record = new BeamSQLRow(tableSchema);
     record.setNullFields(nullFields);
 
-    for (int idx = 0; idx < type.size(); ++idx) {
+    for (int idx = 0; idx < tableSchema.size(); ++idx) {
       if (nullFields.contains(idx)) {
         continue;
       }
 
-      switch (type.getFieldsType().get(idx)) {
+      switch (tableSchema.getFieldsType().get(idx)) {
         case INTEGER:
           record.addField(idx, intCoder.decode(inStream, context.nested()));
           break;
@@ -162,7 +155,7 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
           break;
 
         default:
-          throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
+          throw new UnsupportedDataTypeException(tableSchema.getFieldsType().get(idx));
       }
     }
 
@@ -172,15 +165,12 @@ public class BeamSqlRowCoder extends StandardCoder<BeamSQLRow>{
     return record;
   }
 
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return null;
+  public BeamSQLRecordType getTableSchema() {
+    return tableSchema;
   }
 
   @Override
   public void verifyDeterministic()
       throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 0d9d147..03f7705 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -21,7 +21,7 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -38,13 +38,11 @@ import org.junit.BeforeClass;
  *
  */
 public class BasePlanner {
-  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
-
   @BeforeClass
   public static void prepareClass() {
-    runner.addTableMetadata("ORDER_DETAILS", getTable());
-    runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
-    runner.addTableMetadata("SUB_ORDER_RAM", getTable());
+    BeamSqlEnv.registerTable("ORDER_DETAILS", getTable());
+    BeamSqlEnv.registerTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    BeamSqlEnv.registerTable("SUB_ORDER_RAM", getTable());
   }
 
   private static BaseBeamTable getTable() {

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
index 98d14c3..4ea0662 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
 import org.junit.Test;
 
@@ -33,7 +35,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
   public void testSimpleGroupExplain() throws Exception {
     String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 ";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -43,7 +45,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
   public void testSimpleGroup2Explain() throws Exception {
     String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -54,7 +56,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -66,7 +68,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
         + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -77,7 +79,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -88,7 +90,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 
   /**
@@ -96,9 +98,9 @@ public class BeamGroupByExplainTest extends BasePlanner {
    */
   @Test
   public void testUdf() throws Exception {
-    runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+    BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
     String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
 
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index 5101c98..0436ca1 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -37,7 +39,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroupExplain() throws Exception {
     String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 ";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -47,7 +49,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroup2Explain() throws Exception {
     String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -58,7 +60,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -70,7 +72,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
         + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -81,7 +83,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -92,7 +94,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   /**
@@ -100,10 +102,10 @@ public class BeamGroupByPipelineTest extends BasePlanner {
    */
   @Test
   public void testUdf() throws Exception {
-    runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
+    BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative");
     String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS";
 
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
index 72b5bf7..946a9fd 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -36,7 +37,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
   public void testTumble2Explain() throws Exception {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   @Test(expected = ValidationException.class)
@@ -44,7 +45,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
     String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index ffc3e01..a296eec 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -22,7 +22,8 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -43,15 +44,14 @@ import org.junit.Test;
  */
 public class BeamPlannerAggregationSubmitTest {
   public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
 
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepareClass() throws ParseException {
-    runner.addTableMetadata("ORDER_DETAILS", getOrderTable());
-    runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable());
+    BeamSqlEnv.registerTable("ORDER_DETAILS", getOrderTable());
+    BeamSqlEnv.registerTable("ORDER_SUMMARY", getSummaryTable());
   }
 
   @Before
@@ -120,7 +120,7 @@ public class BeamPlannerAggregationSubmitTest {
         + "WHERE SITE_ID = 1 " + "GROUP BY site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
 
     pipeline.run().waitUntilFinish();
 
@@ -137,7 +137,7 @@ public class BeamPlannerAggregationSubmitTest {
         + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
 
     pipeline.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 1355d5d..e617ff2 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,7 +29,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
   @Test
   public void selectAll() throws Exception {
     String sql = "SELECT * FROM ORDER_DETAILS";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
 
     String expectedPlan =
         "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], order_time=[$3])\n"
@@ -40,7 +41,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
   public void selectWithFilter() throws Exception {
     String sql = "SELECT " + " order_id, site_id, price " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
 
     String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
         + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
@@ -53,7 +54,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
     String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
         + " order_id, site_id, price " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.executionPlan(sql);
+    String plan = BeamSqlCli.explainQuery(sql);
 
     String expectedPlan =
         "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], flattened=[true])\n"

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 7219d11..8a48618 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -44,7 +45,7 @@ public class BeamPlannerSubmitTest extends BasePlanner {
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
 
-    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline);
 
     pipeline.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index 4935c3b..a44b0d9 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -22,7 +22,8 @@ import java.util.Collection;
 import java.util.Date;
 import java.util.Iterator;
 
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -37,7 +38,6 @@ import org.junit.Test;
  * Test for {@code BeamSortRel}.
  */
 public class BeamSortRelTest {
-  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
 
@@ -71,7 +71,7 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc, site_id desc limit 4";
 
     System.out.println(sql);
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -88,7 +88,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsFirst() throws Exception {
-    runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
+    BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -98,7 +98,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
+    BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -108,7 +108,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -126,7 +126,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsLast() throws Exception {
-    runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
+    BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -136,7 +136,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
+    BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -146,7 +146,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -169,7 +169,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -192,7 +192,7 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
     pipeline.run().waitUntilFinish();
 
     assertEquals(
@@ -223,13 +223,13 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc limit 11";
 
     TestPipeline pipeline = TestPipeline.create();
-    runner.compileBeamPipeline(sql, pipeline);
+    BeamSqlCli.compilePipeline(sql, pipeline);
   }
 
   @Before
   public void prepare() {
-    runner.addTableMetadata("ORDER_DETAILS", orderDetailTable);
-    runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable);
+    BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable);
+    BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable);
     MockedBeamSQLTable.CONTENT.clear();
   }