You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/06/12 14:48:41 UTC

[4/5] beam git commit: rename SQL to Sql in class name

rename SQL to Sql in class name


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/743f0b3b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/743f0b3b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/743f0b3b

Branch: refs/heads/DSL_SQL
Commit: 743f0b3b62616d1c8fe77d306920fbcce81c8855
Parents: 5c1f2cb
Author: mingmxu <mi...@ebay.com>
Authored: Fri Jun 9 23:07:00 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Mon Jun 12 07:47:40 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/dsls/sql/BeamSql.java  |  20 +-
 .../org/apache/beam/dsls/sql/BeamSqlCli.java    |   8 +-
 .../beam/dsls/sql/example/BeamSqlExample.java   |  16 +-
 .../interpreter/BeamSQLExpressionExecutor.java  |  43 ---
 .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 235 ------------
 .../interpreter/BeamSqlExpressionExecutor.java  |  43 +++
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java | 235 ++++++++++++
 .../operator/BeamSqlAndExpression.java          |   4 +-
 .../operator/BeamSqlCaseExpression.java         |   4 +-
 .../operator/BeamSqlCompareExpression.java      |   4 +-
 .../interpreter/operator/BeamSqlExpression.java |   8 +-
 .../operator/BeamSqlInputRefExpression.java     |   4 +-
 .../operator/BeamSqlIsNotNullExpression.java    |   4 +-
 .../operator/BeamSqlIsNullExpression.java       |   4 +-
 .../operator/BeamSqlOrExpression.java           |   4 +-
 .../interpreter/operator/BeamSqlPrimitive.java  |   6 +-
 .../operator/BeamSqlUdfExpression.java          |   4 +-
 .../operator/BeamSqlWindowEndExpression.java    |   4 +-
 .../operator/BeamSqlWindowExpression.java       |   4 +-
 .../operator/BeamSqlWindowStartExpression.java  |   4 +-
 .../arithmetic/BeamSqlArithmeticExpression.java |   6 +-
 .../math/BeamSqlMathUnaryExpression.java        |   4 +-
 .../string/BeamSqlCharLengthExpression.java     |   4 +-
 .../string/BeamSqlConcatExpression.java         |   4 +-
 .../string/BeamSqlInitCapExpression.java        |   4 +-
 .../operator/string/BeamSqlLowerExpression.java |   4 +-
 .../string/BeamSqlOverlayExpression.java        |   4 +-
 .../string/BeamSqlPositionExpression.java       |   4 +-
 .../string/BeamSqlSubstringExpression.java      |   4 +-
 .../operator/string/BeamSqlTrimExpression.java  |   4 +-
 .../operator/string/BeamSqlUpperExpression.java |   4 +-
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |   4 +-
 .../beam/dsls/sql/planner/BeamSQLRelUtils.java  |  74 ----
 .../beam/dsls/sql/planner/BeamSqlRelUtils.java  |  74 ++++
 .../beam/dsls/sql/rel/BeamAggregationRel.java   |  58 +--
 .../apache/beam/dsls/sql/rel/BeamFilterRel.java |  28 +-
 .../apache/beam/dsls/sql/rel/BeamIOSinkRel.java |  12 +-
 .../beam/dsls/sql/rel/BeamIOSourceRel.java      |  14 +-
 .../beam/dsls/sql/rel/BeamProjectRel.java       |  28 +-
 .../apache/beam/dsls/sql/rel/BeamRelNode.java   |   4 +-
 .../apache/beam/dsls/sql/rel/BeamSortRel.java   |  32 +-
 .../apache/beam/dsls/sql/rel/BeamValuesRel.java |  16 +-
 .../beam/dsls/sql/schema/BaseBeamTable.java     |  10 +-
 .../dsls/sql/schema/BeamPCollectionTable.java   |  10 +-
 .../beam/dsls/sql/schema/BeamSQLRecordType.java |  97 -----
 .../apache/beam/dsls/sql/schema/BeamSQLRow.java | 367 -------------------
 .../beam/dsls/sql/schema/BeamSqlRecordType.java |  96 +++++
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java | 367 +++++++++++++++++++
 .../beam/dsls/sql/schema/BeamSqlRowCoder.java   |  16 +-
 .../beam/dsls/sql/schema/BeamTableUtils.java    |  10 +-
 .../sql/schema/kafka/BeamKafkaCSVTable.java     |  42 +--
 .../dsls/sql/schema/kafka/BeamKafkaTable.java   |  16 +-
 .../dsls/sql/schema/text/BeamTextCSVTable.java  |   6 +-
 .../schema/text/BeamTextCSVTableIOReader.java   |  18 +-
 .../schema/text/BeamTextCSVTableIOWriter.java   |  20 +-
 .../transform/BeamAggregationTransforms.java    |  56 +--
 .../dsls/sql/transform/BeamSQLFilterFn.java     |  62 ----
 .../sql/transform/BeamSQLOutputToConsoleFn.java |  41 ---
 .../dsls/sql/transform/BeamSQLProjectFn.java    |  72 ----
 .../dsls/sql/transform/BeamSqlFilterFn.java     |  62 ++++
 .../sql/transform/BeamSqlOutputToConsoleFn.java |  41 +++
 .../dsls/sql/transform/BeamSqlProjectFn.java    |  72 ++++
 .../beam/dsls/sql/transform/package-info.java   |   2 +-
 .../sql/interpreter/BeamSQLFnExecutorTest.java  | 268 --------------
 .../interpreter/BeamSQLFnExecutorTestBase.java  |  91 -----
 .../sql/interpreter/BeamSqlFnExecutorTest.java  | 268 ++++++++++++++
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  91 +++++
 .../operator/BeamNullExperssionTest.java        |   4 +-
 .../operator/BeamSqlAndOrExpressionTest.java    |   4 +-
 .../operator/BeamSqlCaseExpressionTest.java     |   4 +-
 .../operator/BeamSqlCompareExpressionTest.java  |   4 +-
 .../operator/BeamSqlInputRefExpressionTest.java |   4 +-
 .../operator/BeamSqlPrimitiveTest.java          |   4 +-
 .../operator/BeamSqlUdfExpressionTest.java      |   4 +-
 .../BeamSqlArithmeticExpressionTest.java        |   4 +-
 .../math/BeamSqlMathUnaryExpressionTest.java    |   4 +-
 .../string/BeamSqlCharLengthExpressionTest.java |   4 +-
 .../string/BeamSqlConcatExpressionTest.java     |   4 +-
 .../string/BeamSqlInitCapExpressionTest.java    |   4 +-
 .../string/BeamSqlLowerExpressionTest.java      |   4 +-
 .../string/BeamSqlOverlayExpressionTest.java    |   4 +-
 .../string/BeamSqlPositionExpressionTest.java   |   4 +-
 .../string/BeamSqlSubstringExpressionTest.java  |   4 +-
 .../string/BeamSqlTrimExpressionTest.java       |   4 +-
 .../string/BeamSqlUpperExpressionTest.java      |   4 +-
 .../beam/dsls/sql/planner/BasePlanner.java      |  16 +-
 .../sql/planner/BeamGroupByPipelineTest.java    |  16 +-
 .../sql/planner/BeamInvalidGroupByTest.java     |   6 +-
 .../BeamPlannerAggregationSubmitTest.java       |  28 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  10 +-
 .../dsls/sql/planner/MockedBeamSQLTable.java    | 161 --------
 .../dsls/sql/planner/MockedBeamSqlTable.java    | 161 ++++++++
 .../beam/dsls/sql/rel/BeamSortRelTest.java      |  44 +--
 .../beam/dsls/sql/rel/BeamValuesRelTest.java    |  22 +-
 .../sql/schema/BeamPCollectionTableTest.java    |   6 +-
 .../dsls/sql/schema/BeamSqlRowCoderTest.java    |   4 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |  16 +-
 .../sql/schema/text/BeamTextCSVTableTest.java   |  18 +-
 .../transform/BeamAggregationTransformTest.java |  56 +--
 .../schema/transform/BeamTransformBaseTest.java |  22 +-
 100 files changed, 1953 insertions(+), 1954 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
index 8c2c5ad..809fed3 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSql.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BeamPCollectionTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -76,14 +76,14 @@ public class BeamSql {
    * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
    *
    * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
-   * all the input tables and results in a {@code PCollection<BeamSQLRow>} representing the output
+   * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
    * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
-   * {@code PCollection<BeamSQLRow>}, each representing an input table.
+   * {@code PCollection<BeamSqlRow>}, each representing an input table.
    *
    * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
    * referenced within the query.
    */
-  public static PTransform<PCollectionTuple, PCollection<BeamSQLRow>> query(String sqlQuery) {
+  public static PTransform<PCollectionTuple, PCollection<BeamSqlRow>> query(String sqlQuery) {
     return new QueryTransform(sqlQuery);
 
   }
@@ -94,7 +94,7 @@ public class BeamSql {
    * <p>This is a simplified form of {@link #query(String)} where the query must reference
    * a single input table.
    */
-  public static PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>>
+  public static PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>>
   simpleQuery(String sqlQuery) throws Exception {
     return new SimpleQueryTransform(sqlQuery);
   }
@@ -102,14 +102,14 @@ public class BeamSql {
   /**
    * A {@link PTransform} representing an execution plan for a SQL query.
    */
-  public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSQLRow>> {
+  public static class QueryTransform extends PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
     private String sqlQuery;
     public QueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
     }
 
     @Override
-    public PCollection<BeamSQLRow> expand(PCollectionTuple input) {
+    public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
       BeamRelNode beamRelNode = null;
       try {
         beamRelNode = BeamSqlEnv.planner.convertToBeamRel(sqlQuery);
@@ -130,7 +130,7 @@ public class BeamSql {
    * a single table.
    */
   public static class SimpleQueryTransform
-      extends PTransform<PCollection<BeamSQLRow>, PCollection<BeamSQLRow>> {
+      extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
     private String sqlQuery;
     public SimpleQueryTransform(String sqlQuery) {
       this.sqlQuery = sqlQuery;
@@ -141,7 +141,7 @@ public class BeamSql {
     }
 
     @Override
-    public PCollection<BeamSQLRow> expand(PCollection<BeamSQLRow> input) {
+    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
       SqlNode sqlNode;
       try {
         sqlNode = BeamSqlEnv.planner.parseQuery(sqlQuery);
@@ -156,7 +156,7 @@ public class BeamSql {
         String tableName = select.getFrom().toString();
         BeamSqlEnv.registerTable(tableName,
             new BeamPCollectionTable(input, inputCoder.getTableSchema().toRelDataType()));
-        return PCollectionTuple.of(new TupleTag<BeamSQLRow>(tableName), input)
+        return PCollectionTuple.of(new TupleTag<BeamSqlRow>(tableName), input)
             .apply(BeamSql.query(sqlQuery));
       } else {
         throw new BeamSqlUnsupportedException(sqlNode.toString());

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
index 6591589..a55f655 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlCli.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql;
 
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -48,7 +48,7 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement) throws Exception{
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement) throws Exception{
     PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
         .as(PipelineOptions.class); // FlinkPipelineOptions.class
     options.setJobName("BeamPlanCreator");
@@ -60,9 +60,9 @@ public class BeamSqlCli {
   /**
    * compile SQL, and return a {@link Pipeline}.
    */
-  public static PCollection<BeamSQLRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline)
       throws Exception{
-    PCollection<BeamSQLRow> resultStream =
+    PCollection<BeamSqlRow> resultStream =
         BeamSqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline);
     return resultStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 6a1b81d..4d7328e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -18,8 +18,8 @@
 package org.apache.beam.dsls.sql.example;
 
 import org.apache.beam.dsls.sql.BeamSql;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -45,28 +45,28 @@ public class BeamSqlExample {
     Pipeline p = Pipeline.create(options);
 
     //define the input row format
-    BeamSQLRecordType type = new BeamSQLRecordType();
+    BeamSqlRecordType type = new BeamSqlRecordType();
     type.addField("c1", SqlTypeName.INTEGER);
     type.addField("c2", SqlTypeName.VARCHAR);
     type.addField("c3", SqlTypeName.DOUBLE);
-    BeamSQLRow row = new BeamSQLRow(type);
+    BeamSqlRow row = new BeamSqlRow(type);
     row.addField(0, 1);
     row.addField(1, "row");
     row.addField(2, 1.0);
 
     //create a source PCollection with Create.of();
-    PCollection<BeamSQLRow> inputTable = PBegin.in(p).apply(Create.of(row)
+    PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
         .withCoder(new BeamSqlRowCoder(type)));
 
     //run a simple SQL query over input PCollection;
     String sql = "select c2, c3 from TABLE_A where c1=1";
-    PCollection<BeamSQLRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
+    PCollection<BeamSqlRow> outputStream = inputTable.apply(BeamSql.simpleQuery(sql));
 
     //log out the output record;
     outputStream.apply("log_result",
-        MapElements.<BeamSQLRow, Void>via(new SimpleFunction<BeamSQLRow, Void>() {
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
       @Override
-      public Void apply(BeamSQLRow input) {
+      public Void apply(BeamSqlRow input) {
         LOG.info(input.valueInString());
         return null;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
deleted file mode 100644
index 1285280..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLExpressionExecutor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-
-/**
- * {@code BeamSQLExpressionExecutor} fills the gap between relational
- * expressions in Calcite SQL and executable code.
- *
- */
-public interface BeamSQLExpressionExecutor extends Serializable {
-
-  /**
-   * invoked before data processing.
-   */
-  void prepare();
-
-  /**
-   * apply transformation to input record {@link BeamSQLRow}.
-   *
-   */
-  List<Object> execute(BeamSQLRow inputRecord);
-
-  void close();
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
deleted file mode 100644
index 51fe2c9..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexInputRef;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
-import org.apache.calcite.util.NlsString;
-
-/**
- * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
- * {@code BeamSQLFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
- * which can be evaluated against the {@link BeamSQLRow}.
- *
- */
-public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor {
-  protected List<BeamSqlExpression> exps;
-
-  public BeamSQLFnExecutor(BeamRelNode relNode) {
-    this.exps = new ArrayList<>();
-    if (relNode instanceof BeamFilterRel) {
-      BeamFilterRel filterNode = (BeamFilterRel) relNode;
-      RexNode condition = filterNode.getCondition();
-      exps.add(buildExpression(condition));
-    } else if (relNode instanceof BeamProjectRel) {
-      BeamProjectRel projectNode = (BeamProjectRel) relNode;
-      List<RexNode> projects = projectNode.getProjects();
-      for (RexNode rexNode : projects) {
-        exps.add(buildExpression(rexNode));
-      }
-    } else {
-      throw new BeamSqlUnsupportedException(
-          String.format("%s is not supported yet", relNode.getClass().toString()));
-    }
-  }
-
-  /**
-   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
-   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
-   */
-  static BeamSqlExpression buildExpression(RexNode rexNode) {
-    if (rexNode instanceof RexLiteral) {
-      RexLiteral node = (RexLiteral) rexNode;
-      // NlsString is not serializable, we need to convert
-      // it to string explicitly.
-      if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
-          && node.getValue() instanceof NlsString) {
-        return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue());
-      } else {
-        return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
-      }
-    } else if (rexNode instanceof RexInputRef) {
-      RexInputRef node = (RexInputRef) rexNode;
-      return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
-    } else if (rexNode instanceof RexCall) {
-      RexCall node = (RexCall) rexNode;
-      String opName = node.op.getName();
-      List<BeamSqlExpression> subExps = new ArrayList<>();
-      for (RexNode subNode : node.getOperands()) {
-        subExps.add(buildExpression(subNode));
-      }
-      switch (opName) {
-        case "AND":
-        return new BeamSqlAndExpression(subExps);
-        case "OR":
-          return new BeamSqlOrExpression(subExps);
-
-        case "=":
-          return new BeamSqlEqualExpression(subExps);
-        case "<>=":
-          return new BeamSqlNotEqualExpression(subExps);
-        case ">":
-          return new BeamSqlLargerThanExpression(subExps);
-        case ">=":
-          return new BeamSqlLargerThanEqualExpression(subExps);
-        case "<":
-          return new BeamSqlLessThanExpression(subExps);
-        case "<=":
-          return new BeamSqlLessThanEqualExpression(subExps);
-
-        // arithmetic operators
-        case "+":
-          return new BeamSqlPlusExpression(subExps);
-        case "-":
-          return new BeamSqlMinusExpression(subExps);
-        case "*":
-          return new BeamSqlMultiplyExpression(subExps);
-        case "/":
-          return new BeamSqlDivideExpression(subExps);
-        case "MOD":
-          return new BeamSqlModExpression(subExps);
-
-        case "ABS":
-          return new BeamSqlAbsExpression(subExps);
-        case "SQRT":
-          return new BeamSqlSqrtExpression(subExps);
-
-        // string operators
-        case "||":
-          return new BeamSqlConcatExpression(subExps);
-        case "POSITION":
-          return new BeamSqlPositionExpression(subExps);
-        case "CHAR_LENGTH":
-        case "CHARACTER_LENGTH":
-          return new BeamSqlCharLengthExpression(subExps);
-        case "UPPER":
-          return new BeamSqlUpperExpression(subExps);
-        case "LOWER":
-          return new BeamSqlLowerExpression(subExps);
-        case "TRIM":
-          return new BeamSqlTrimExpression(subExps);
-        case "SUBSTRING":
-          return new BeamSqlSubstringExpression(subExps);
-        case "OVERLAY":
-          return new BeamSqlOverlayExpression(subExps);
-        case "INITCAP":
-          return new BeamSqlInitCapExpression(subExps);
-
-        case "CASE":
-          return new BeamSqlCaseExpression(subExps);
-
-        case "IS NULL":
-          return new BeamSqlIsNullExpression(subExps.get(0));
-      case "IS NOT NULL":
-        return new BeamSqlIsNotNullExpression(subExps.get(0));
-
-      case "HOP":
-      case "TUMBLE":
-      case "SESSION":
-        return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
-      case "HOP_START":
-      case "TUMBLE_START":
-      case "SESSION_START":
-        return new BeamSqlWindowStartExpression();
-      case "HOP_END":
-      case "TUMBLE_END":
-      case "SESSION_END":
-        return new BeamSqlWindowEndExpression();
-      default:
-        //handle UDF
-        if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
-          SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
-          ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
-          return new BeamSqlUdfExpression(fn.method, subExps,
-              ((RexCall) rexNode).type.getSqlTypeName());
-        } else {
-          throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
-        }
-      }
-    } else {
-      throw new BeamSqlUnsupportedException(
-          String.format("%s is not supported yet", rexNode.getClass().toString()));
-    }
-  }
-
-  @Override
-  public void prepare() {
-  }
-
-  @Override
-  public List<Object> execute(BeamSQLRow inputRecord) {
-    List<Object> results = new ArrayList<>();
-    for (BeamSqlExpression exp : exps) {
-      results.add(exp.evaluate(inputRecord).getValue());
-    }
-    return results;
-  }
-
-  @Override
-  public void close() {
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..a314bf4
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+  /**
+   * invoked before data processing.
+   */
+  void prepare();
+
+  /**
+   * apply transformation to input record {@link BeamSqlRow}.
+   *
+   */
+  List<Object> execute(BeamSqlRow inputRecord);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..1d1dfc1
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNotNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlIsNullExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLargerThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+  protected List<BeamSqlExpression> exps;
+
+  public BeamSqlFnExecutor(BeamRelNode relNode) {
+    this.exps = new ArrayList<>();
+    if (relNode instanceof BeamFilterRel) {
+      BeamFilterRel filterNode = (BeamFilterRel) relNode;
+      RexNode condition = filterNode.getCondition();
+      exps.add(buildExpression(condition));
+    } else if (relNode instanceof BeamProjectRel) {
+      BeamProjectRel projectNode = (BeamProjectRel) relNode;
+      List<RexNode> projects = projectNode.getProjects();
+      for (RexNode rexNode : projects) {
+        exps.add(buildExpression(rexNode));
+      }
+    } else {
+      throw new BeamSqlUnsupportedException(
+          String.format("%s is not supported yet", relNode.getClass().toString()));
+    }
+  }
+
+  /**
+   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+   */
+  static BeamSqlExpression buildExpression(RexNode rexNode) {
+    if (rexNode instanceof RexLiteral) {
+      RexLiteral node = (RexLiteral) rexNode;
+      // NlsString is not serializable, we need to convert
+      // it to string explicitly.
+      if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
+          && node.getValue() instanceof NlsString) {
+        return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) node.getValue()).getValue());
+      } else {
+        return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+      }
+    } else if (rexNode instanceof RexInputRef) {
+      RexInputRef node = (RexInputRef) rexNode;
+      return new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+    } else if (rexNode instanceof RexCall) {
+      RexCall node = (RexCall) rexNode;
+      String opName = node.op.getName();
+      List<BeamSqlExpression> subExps = new ArrayList<>();
+      for (RexNode subNode : node.getOperands()) {
+        subExps.add(buildExpression(subNode));
+      }
+      switch (opName) {
+        case "AND":
+        return new BeamSqlAndExpression(subExps);
+        case "OR":
+          return new BeamSqlOrExpression(subExps);
+
+        case "=":
+          return new BeamSqlEqualExpression(subExps);
+        case "<>=":
+          return new BeamSqlNotEqualExpression(subExps);
+        case ">":
+          return new BeamSqlLargerThanExpression(subExps);
+        case ">=":
+          return new BeamSqlLargerThanEqualExpression(subExps);
+        case "<":
+          return new BeamSqlLessThanExpression(subExps);
+        case "<=":
+          return new BeamSqlLessThanEqualExpression(subExps);
+
+        // arithmetic operators
+        case "+":
+          return new BeamSqlPlusExpression(subExps);
+        case "-":
+          return new BeamSqlMinusExpression(subExps);
+        case "*":
+          return new BeamSqlMultiplyExpression(subExps);
+        case "/":
+          return new BeamSqlDivideExpression(subExps);
+        case "MOD":
+          return new BeamSqlModExpression(subExps);
+
+        case "ABS":
+          return new BeamSqlAbsExpression(subExps);
+        case "SQRT":
+          return new BeamSqlSqrtExpression(subExps);
+
+        // string operators
+        case "||":
+          return new BeamSqlConcatExpression(subExps);
+        case "POSITION":
+          return new BeamSqlPositionExpression(subExps);
+        case "CHAR_LENGTH":
+        case "CHARACTER_LENGTH":
+          return new BeamSqlCharLengthExpression(subExps);
+        case "UPPER":
+          return new BeamSqlUpperExpression(subExps);
+        case "LOWER":
+          return new BeamSqlLowerExpression(subExps);
+        case "TRIM":
+          return new BeamSqlTrimExpression(subExps);
+        case "SUBSTRING":
+          return new BeamSqlSubstringExpression(subExps);
+        case "OVERLAY":
+          return new BeamSqlOverlayExpression(subExps);
+        case "INITCAP":
+          return new BeamSqlInitCapExpression(subExps);
+
+        case "CASE":
+          return new BeamSqlCaseExpression(subExps);
+
+        case "IS NULL":
+          return new BeamSqlIsNullExpression(subExps.get(0));
+      case "IS NOT NULL":
+        return new BeamSqlIsNotNullExpression(subExps.get(0));
+
+      case "HOP":
+      case "TUMBLE":
+      case "SESSION":
+        return new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+      case "HOP_START":
+      case "TUMBLE_START":
+      case "SESSION_START":
+        return new BeamSqlWindowStartExpression();
+      case "HOP_END":
+      case "TUMBLE_END":
+      case "SESSION_END":
+        return new BeamSqlWindowEndExpression();
+      default:
+        //handle UDF
+        if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+          SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+          ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+          return new BeamSqlUdfExpression(fn.method, subExps,
+              ((RexCall) rexNode).type.getSqlTypeName());
+        } else {
+          throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!");
+        }
+      }
+    } else {
+      throw new BeamSqlUnsupportedException(
+          String.format("%s is not supported yet", rexNode.getClass().toString()));
+    }
+  }
+
+  @Override
+  public void prepare() {
+  }
+
+  @Override
+  public List<Object> execute(BeamSqlRow inputRecord) {
+    List<Object> results = new ArrayList<>();
+    for (BeamSqlExpression exp : exps) {
+      results.add(exp.evaluate(inputRecord).getValue());
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
index 55473b5..d7dc7d7 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -45,7 +45,7 @@ public class BeamSqlAndExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
     boolean result = true;
     for (BeamSqlExpression exp : operands) {
       BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
index d108abd..a15c42e 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.List;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -49,7 +49,7 @@ public class BeamSqlCaseExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     for (int i = 0; i < operands.size() - 1; i += 2) {
       if (opValueEvaluated(i, inputRecord)) {
         return BeamSqlPrimitive.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
index bfb798d..d75e13d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.List;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -49,7 +49,7 @@ public abstract class BeamSqlCompareExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
     Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
     Object rightValue = operands.get(1).evaluate(inputRecord).getValue();
     switch (operands.get(0).outputType) {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
index 811e21b..41dac76 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.io.Serializable;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.type.SqlTypeName;
 
@@ -49,7 +49,7 @@ public abstract class BeamSqlExpression implements Serializable{
     return op(idx).getOutputType();
   }
 
-  public <T> T opValueEvaluated(int idx, BeamSQLRow row) {
+  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
     return (T) op(idx).evaluate(row).getValue();
   }
 
@@ -59,10 +59,10 @@ public abstract class BeamSqlExpression implements Serializable{
   public abstract boolean accept();
 
   /**
-   * Apply input record {@link BeamSQLRow} to this expression,
+   * Apply input record {@link BeamSqlRow} to this expression,
    * the output value is wrapped with {@link BeamSqlPrimitive}.
    */
-  public abstract BeamSqlPrimitive evaluate(BeamSQLRow inputRecord);
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRecord);
 
   public List<BeamSqlExpression> getOperands() {
     return operands;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
index 612108f..3e99caf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.dsls.sql.interpreter.operator;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -38,7 +38,7 @@ public class BeamSqlInputRefExpression extends BeamSqlExpression{
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     return BeamSqlPrimitive.of(outputType, inputRecord.getFieldValue(inputRef));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
index 784584e..e08e737 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNotNullExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -44,7 +44,7 @@ public class BeamSqlIsNotNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
     Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue != null);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
index b09ddbf..d4e070d 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlIsNullExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -44,7 +44,7 @@ public class BeamSqlIsNullExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
     Object leftValue = operands.get(0).evaluate(inputRecord).getValue();
     return BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, leftValue == null);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
index 4d07af8..e47ed45 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlOrExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -45,7 +45,7 @@ public class BeamSqlOrExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Boolean> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Boolean> evaluate(BeamSqlRow inputRecord) {
     boolean result = false;
     for (BeamSqlExpression exp : operands) {
       BeamSqlPrimitive<Boolean> expOut = exp.evaluate(inputRecord);

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index bc18c5e..d1fd886 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -24,13 +24,13 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.util.NlsString;
 
 /**
  * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
- * It holds the value, and return it directly during {@link #evaluate(BeamSQLRow)}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
  *
  */
 public class BeamSqlPrimitive<T> extends BeamSqlExpression{
@@ -141,7 +141,7 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression{
   }
 
   @Override
-  public BeamSqlPrimitive<T> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRecord) {
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
index 389a87e..6f18307 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -51,7 +51,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     if (method == null) {
       reConstructMethod();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
index 96ad81f..8bc090f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Date;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -34,7 +34,7 @@ public class BeamSqlWindowEndExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         new Date(inputRecord.getWindowEnd().getMillis()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
index 2fb9a48..eb4c03b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Date;
 import java.util.List;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -42,7 +42,7 @@ public class BeamSqlWindowExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         (Date) operands.get(0).evaluate(inputRecord).getValue());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
index d0ac260..1e2c0a2 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Date;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -35,7 +35,7 @@ public class BeamSqlWindowStartExpression extends BeamSqlExpression {
   }
 
   @Override
-  public BeamSqlPrimitive<Date> evaluate(BeamSQLRow inputRecord) {
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRecord) {
     return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
         new Date(inputRecord.getWindowStart().getMillis()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
index 5e1d068..69f6f10 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -55,7 +55,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
   /**
    * https://dev.mysql.com/doc/refman/5.7/en/arithmetic-functions.html.
    */
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
     BeamSqlExpression leftOp = operands.get(0);
     BeamSqlExpression rightOp = operands.get(1);
 
@@ -78,7 +78,7 @@ public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
     }
   }
 
-  private double getDouble(BeamSQLRow inputRecord, BeamSqlExpression op) {
+  private double getDouble(BeamSqlRow inputRecord, BeamSqlExpression op) {
     Object raw = op.evaluate(inputRecord).getValue();
     Double ret = null;
     if (SqlTypeName.NUMERIC_TYPES.contains(op.getOutputType())) {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
index e34d4e4..a65333c 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpression.java
@@ -21,7 +21,7 @@ package org.apache.beam.dsls.sql.interpreter.operator.math;
 import java.util.List;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 
@@ -45,7 +45,7 @@ public abstract class BeamSqlMathUnaryExpression extends BeamSqlExpression {
     return acceptance;
   }
 
-  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRecord) {
     BeamSqlExpression operand = op(0);
     return calculate(operand.evaluate(inputRecord));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
index 7dbd7f1..3ed9b80 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -33,7 +33,7 @@ public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.INTEGER);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
     return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
index a56e9b1..e8e4e50 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -52,7 +52,7 @@ public class BeamSqlConcatExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String left = opValueEvaluated(0, inputRecord);
     String right = opValueEvaluated(1, inputRecord);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
index 3d0125f..51dfe28 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -33,7 +33,7 @@ public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
 
     StringBuilder ret = new StringBuilder(str);

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
index 1855c65..f70fb1a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -33,7 +33,7 @@ public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
index 73f2591..20d9962 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -55,7 +55,7 @@ public class BeamSqlOverlayExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
     String replaceStr = opValueEvaluated(1, inputRecord);
     int idx = opValueEvaluated(2, inputRecord);

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
index a5e8400..1d09b51 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -57,7 +57,7 @@ public class BeamSqlPositionExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String targetStr = opValueEvaluated(0, inputRecord);
     String containingStr = opValueEvaluated(1, inputRecord);
     int from = -1;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
index 554a3fc..d9bbc98 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -55,7 +55,7 @@ public class BeamSqlSubstringExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
     int idx = opValueEvaluated(1, inputRecord);
     int startIdx = idx;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
index d6cad74..d7c8a6a 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -58,7 +58,7 @@ public class BeamSqlTrimExpression extends BeamSqlExpression {
     return true;
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     if (operands.size() == 1) {
       return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
           opValueEvaluated(0, inputRecord).toString().trim());

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
index d58a283..8fcaca4 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpression.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 /**
@@ -33,7 +33,7 @@ public class BeamSqlUpperExpression extends BeamSqlStringUnaryExpression {
     super(operands, SqlTypeName.VARCHAR);
   }
 
-  @Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
     String str = opValueEvaluated(0, inputRecord);
     return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toUpperCase());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 6f148d6..98580cb 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -106,7 +106,7 @@ public class BeamQueryPlanner {
    * which is linked with the given {@code pipeline}. The final output stream is returned as
    * {@code PCollection} so more operations can be applied.
    */
-  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
+  public PCollection<BeamSqlRow> compileBeamPipeline(String sqlStatement, Pipeline basePipeline)
       throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
deleted file mode 100644
index 5e5f215..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSQLRelUtils.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.plan.volcano.RelSubset;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.sql.SqlExplainLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utilities for {@code BeamRelNode}.
- */
-public class BeamSQLRelUtils {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamSQLRelUtils.class);
-
-  private static final AtomicInteger sequence = new AtomicInteger(0);
-  private static final AtomicInteger classSequence = new AtomicInteger(0);
-
-  public static String getStageName(BeamRelNode relNode) {
-    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
-        + sequence.getAndIncrement();
-  }
-
-  public static String getClassName(BeamRelNode relNode) {
-    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
-        + "_" + classSequence.getAndIncrement();
-  }
-
-  public static BeamRelNode getBeamRelInput(RelNode input) {
-    if (input instanceof RelSubset) {
-      // go with known best input
-      input = ((RelSubset) input).getBest();
-    }
-    return (BeamRelNode) input;
-  }
-
-  public static String explain(final RelNode rel) {
-    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
-  }
-
-  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
-    String explain = "";
-    try {
-      explain = RelOptUtil.toString(rel);
-    } catch (StackOverflowError e) {
-      LOG.error("StackOverflowError occurred while extracting plan. "
-          + "Please report it to the dev@ mailing list.");
-      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
-      LOG.error("Forcing plan to empty string and continue... "
-          + "SQL Runner may not working properly after.");
-    }
-    return explain;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
----------------------------------------------------------------------
diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
new file mode 100644
index 0000000..d9b6e17
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRelUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.planner;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for {@code BeamRelNode}.
+ */
+public class BeamSqlRelUtils {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamSqlRelUtils.class);
+
+  private static final AtomicInteger sequence = new AtomicInteger(0);
+  private static final AtomicInteger classSequence = new AtomicInteger(0);
+
+  public static String getStageName(BeamRelNode relNode) {
+    return relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId() + "_"
+        + sequence.getAndIncrement();
+  }
+
+  public static String getClassName(BeamRelNode relNode) {
+    return "Generated_" + relNode.getClass().getSimpleName().toUpperCase() + "_" + relNode.getId()
+        + "_" + classSequence.getAndIncrement();
+  }
+
+  public static BeamRelNode getBeamRelInput(RelNode input) {
+    if (input instanceof RelSubset) {
+      // go with known best input
+      input = ((RelSubset) input).getBest();
+    }
+    return (BeamRelNode) input;
+  }
+
+  public static String explain(final RelNode rel) {
+    return explain(rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+  }
+
+  public static String explain(final RelNode rel, SqlExplainLevel detailLevel) {
+    String explain = "";
+    try {
+      explain = RelOptUtil.toString(rel);
+    } catch (StackOverflowError e) {
+      LOG.error("StackOverflowError occurred while extracting plan. "
+          + "Please report it to the dev@ mailing list.");
+      LOG.error("RelNode " + rel + " ExplainLevel " + detailLevel, e);
+      LOG.error("Forcing plan to empty string and continue... "
+          + "SQL Runner may not working properly after.");
+    }
+    return explain;
+  }
+}