You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:08:59 UTC

[36/59] beam git commit: rename package org.apache.beam.dsls.sql to org.apache.beam.sdk.extensions.sql

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
new file mode 100644
index 0000000..d64ae41
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+
+/**
+ * {@code BeamSql} is the DSL interface of BeamSQL. It translates a SQL query as a
+ * {@link PTransform}, so developers can use standard SQL queries in a Beam pipeline.
+ *
+ * <h1>Beam SQL DSL usage:</h1>
+ * A typical pipeline with Beam SQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options = PipelineOptionsFactory.create();
+Pipeline p = Pipeline.create(options);
+
+//create table from TextIO;
+PCollection<BeamSqlRow> inputTableA = p.apply(TextIO.read().from("/my/input/patha"))
+    .apply(...);
+PCollection<BeamSqlRow> inputTableB = p.apply(TextIO.read().from("/my/input/pathb"))
+    .apply(...);
+
+//run a simple query, and register the output as a table in BeamSql;
+String sql1 = "select MY_FUNC(c1), c2 from PCOLLECTION";
+PCollection<BeamSqlRow> outputTableA = inputTableA.apply(
+    BeamSql.simpleQuery(sql1)
+    .withUdf("MY_FUNC", MY_FUNC.class, "FUNC"));
+
+//run a JOIN with one table from TextIO, and one table from another query
+PCollection<BeamSqlRow> outputTableB = PCollectionTuple.of(
+    new TupleTag<BeamSqlRow>("TABLE_O_A"), outputTableA)
+                .and(new TupleTag<BeamSqlRow>("TABLE_B"), inputTableB)
+    .apply(BeamSql.query("select * from TABLE_O_A JOIN TABLE_B where ..."));
+
+//output the final result with TextIO
+outputTableB.apply(...).apply(TextIO.write().to("/my/output/path"));
+
+p.run().waitUntilFinish();
+ * }
+ * </pre>
+ */
+@Experimental
+public class BeamSql {
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>The returned {@link PTransform} can be applied to a {@link PCollectionTuple} representing
+   * all the input tables and results in a {@code PCollection<BeamSqlRow>} representing the output
+   * table. The {@link PCollectionTuple} contains the mapping from {@code table names} to
+   * {@code PCollection<BeamSqlRow>}, each representing an input table.
+   *
+   * <p>It is an error to apply a {@link PCollectionTuple} missing any {@code table names}
+   * referenced within the query.
+   */
+  public static QueryTransform query(String sqlQuery) {
+    return QueryTransform.builder()
+        .setSqlEnv(new BeamSqlEnv())
+        .setSqlQuery(sqlQuery)
+        .build();
+  }
+
+  /**
+   * Transforms a SQL query into a {@link PTransform} representing an equivalent execution plan.
+   *
+   * <p>This is a simplified form of {@link #query(String)} where the query must reference
+   * a single input table.
+   *
+   * <p>Make sure to query it from a static table name <em>PCOLLECTION</em>.
+   */
+  public static SimpleQueryTransform simpleQuery(String sqlQuery) throws Exception {
+    return SimpleQueryTransform.builder()
+        .setSqlEnv(new BeamSqlEnv())
+        .setSqlQuery(sqlQuery)
+        .build();
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query.
+   */
+  @AutoValue
+  public abstract static class QueryTransform extends
+      PTransform<PCollectionTuple, PCollection<BeamSqlRow>> {
+    abstract BeamSqlEnv getSqlEnv();
+    abstract String getSqlQuery();
+
+    static Builder builder() {
+      return new AutoValue_BeamSql_QueryTransform.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSqlQuery(String sqlQuery);
+      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+      abstract QueryTransform build();
+    }
+
+    /**
+     * register a UDF function used in this query.
+     */
+     public QueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
+       return this;
+     }
+
+     /**
+      * register a UDAF function used in this query.
+      */
+     public QueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+       getSqlEnv().registerUdaf(functionName, clazz);
+       return this;
+     }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollectionTuple input) {
+      registerTables(input);
+
+      BeamRelNode beamRelNode = null;
+      try {
+        beamRelNode = getSqlEnv().planner.convertToBeamRel(getSqlQuery());
+      } catch (ValidationException | RelConversionException | SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+
+      try {
+        return beamRelNode.buildBeamPipeline(input, getSqlEnv());
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
+    //register tables, related with input PCollections.
+    private void registerTables(PCollectionTuple input){
+      for (TupleTag<?> sourceTag : input.getAll().keySet()) {
+        PCollection<BeamSqlRow> sourceStream = (PCollection<BeamSqlRow>) input.get(sourceTag);
+        BeamSqlRowCoder sourceCoder = (BeamSqlRowCoder) sourceStream.getCoder();
+
+        getSqlEnv().registerTable(sourceTag.getId(),
+            new BeamPCollectionTable(sourceStream, sourceCoder.getTableSchema()));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} representing an execution plan for a SQL query referencing
+   * a single table.
+   */
+  @AutoValue
+  public abstract static class SimpleQueryTransform
+      extends PTransform<PCollection<BeamSqlRow>, PCollection<BeamSqlRow>> {
+    private static final String PCOLLECTION_TABLE_NAME = "PCOLLECTION";
+    abstract BeamSqlEnv getSqlEnv();
+    abstract String getSqlQuery();
+
+    static Builder builder() {
+      return new AutoValue_BeamSql_SimpleQueryTransform.Builder();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSqlQuery(String sqlQuery);
+      abstract Builder setSqlEnv(BeamSqlEnv sqlEnv);
+      abstract SimpleQueryTransform build();
+    }
+
+    /**
+     * register a UDF function used in this query.
+     */
+     public SimpleQueryTransform withUdf(String functionName, Class<? extends BeamSqlUdf> clazz){
+       getSqlEnv().registerUdf(functionName, clazz);
+       return this;
+     }
+
+     /**
+      * register a UDAF function used in this query.
+      */
+     public SimpleQueryTransform withUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz){
+       getSqlEnv().registerUdaf(functionName, clazz);
+       return this;
+     }
+
+    private void validateQuery() {
+      SqlNode sqlNode;
+      try {
+        sqlNode = getSqlEnv().planner.parseQuery(getSqlQuery());
+        getSqlEnv().planner.getPlanner().close();
+      } catch (SqlParseException e) {
+        throw new IllegalStateException(e);
+      }
+
+      if (sqlNode instanceof SqlSelect) {
+        SqlSelect select = (SqlSelect) sqlNode;
+        String tableName = select.getFrom().toString();
+        if (!tableName.equalsIgnoreCase(PCOLLECTION_TABLE_NAME)) {
+          throw new IllegalStateException("Use fixed table name " + PCOLLECTION_TABLE_NAME);
+        }
+      } else {
+        throw new UnsupportedOperationException(
+            "Sql operation: " + sqlNode.toString() + " is not supported!");
+      }
+    }
+
+    @Override
+    public PCollection<BeamSqlRow> expand(PCollection<BeamSqlRow> input) {
+      validateQuery();
+      return PCollectionTuple.of(new TupleTag<BeamSqlRow>(PCOLLECTION_TABLE_NAME), input)
+          .apply(QueryTransform.builder()
+              .setSqlEnv(getSqlEnv())
+              .setSqlQuery(getSqlQuery())
+              .build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
new file mode 100644
index 0000000..714e102
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+
+/**
+ * {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client.
+ */
+@Experimental
+public class BeamSqlCli {
+  /**
+   * Returns a human readable representation of the query execution plan.
+   */
+  public static String explainQuery(String sqlString, BeamSqlEnv sqlEnv) throws Exception {
+    BeamRelNode exeTree = sqlEnv.planner.convertToBeamRel(sqlString);
+    String beamPlan = RelOptUtil.toString(exeTree);
+    return beamPlan;
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, BeamSqlEnv sqlEnv)
+      throws Exception{
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
+        .as(PipelineOptions.class); // FlinkPipelineOptions.class
+    options.setJobName("BeamPlanCreator");
+    Pipeline pipeline = Pipeline.create(options);
+
+    return compilePipeline(sqlStatement, pipeline, sqlEnv);
+  }
+
+  /**
+   * compile SQL, and return a {@link Pipeline}.
+   */
+  public static PCollection<BeamSqlRow> compilePipeline(String sqlStatement, Pipeline basePipeline
+      , BeamSqlEnv sqlEnv) throws Exception{
+    PCollection<BeamSqlRow> resultStream =
+        sqlEnv.planner.compileBeamPipeline(sqlStatement, basePipeline, sqlEnv);
+    return resultStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
new file mode 100644
index 0000000..ca73b13
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.impl.AggregateFunctionImpl;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.tools.Frameworks;
+
+/**
+ * {@link BeamSqlEnv} prepares the execution context for {@link BeamSql} and {@link BeamSqlCli}.
+ *
+ * <p>It contains a {@link SchemaPlus} which holds the metadata of tables/UDF functions, and
+ * a {@link BeamQueryPlanner} which parse/validate/optimize/translate input SQL queries.
+ */
+public class BeamSqlEnv implements Serializable{
+  transient SchemaPlus schema;
+  transient BeamQueryPlanner planner;
+
+  public BeamSqlEnv() {
+    schema = Frameworks.createRootSchema(true);
+    planner = new BeamQueryPlanner(schema);
+  }
+
+  /**
+   * Register a UDF function which can be used in SQL expression.
+   */
+  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+    schema.add(functionName, ScalarFunctionImpl.create(clazz, BeamSqlUdf.UDF_METHOD));
+  }
+
+  /**
+   * Register a UDAF function which can be used in GROUP-BY expression.
+   * See {@link BeamSqlUdaf} on how to implement a UDAF.
+   */
+  public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) {
+    schema.add(functionName, AggregateFunctionImpl.create(clazz));
+  }
+
+  /**
+   * Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
+   *
+   */
+  public void registerTable(String tableName, BaseBeamTable table) {
+    schema.add(tableName, new BeamCalciteTable(table.getRowType()));
+    planner.getSourceTables().put(tableName, table);
+  }
+
+  /**
+   * Find {@link BaseBeamTable} by table name.
+   */
+  public BaseBeamTable findTable(String tableName){
+    return planner.getSourceTables().get(tableName);
+  }
+
+  private static class BeamCalciteTable implements ScannableTable, Serializable {
+    private BeamSqlRowType beamSqlRowType;
+    public BeamCalciteTable(BeamSqlRowType beamSqlRowType) {
+      this.beamSqlRowType = beamSqlRowType;
+    }
+    @Override
+    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
+      return CalciteUtils.toCalciteRowType(this.beamSqlRowType)
+          .apply(BeamQueryPlanner.TYPE_FACTORY);
+    }
+
+    @Override
+    public Enumerable<Object[]> scan(DataContext root) {
+      // not used as Beam SQL uses its own execution engine
+      return null;
+    }
+
+    /**
+     * Not used {@link Statistic} to optimize the plan.
+     */
+    @Override
+    public Statistic getStatistic() {
+      return Statistics.UNKNOWN;
+    }
+
+    /**
+     * all sources are treated as TABLE in Beam SQL.
+     */
+    @Override
+    public Schema.TableType getJdbcTableType() {
+      return Schema.TableType.TABLE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
new file mode 100644
index 0000000..21e02a7
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.example;
+
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamSql;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
+ *
+ * <p>Run the example with
+ * <pre>
+ * mvn -pl dsls/sql compile exec:java \
+ *  -Dexec.mainClass=BeamSqlExample \
+ *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
+ * </pre>
+ *
+ */
+class BeamSqlExample {
+  public static void main(String[] args) throws Exception {
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
+    Pipeline p = Pipeline.create(options);
+
+    //define the input row format
+    List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
+    List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
+    BeamSqlRowType type = BeamSqlRowType.create(fieldNames, fieldTypes);
+    BeamSqlRow row = new BeamSqlRow(type);
+    row.addField(0, 1);
+    row.addField(1, "row");
+    row.addField(2, 1.0);
+
+    //create a source PCollection with Create.of();
+    PCollection<BeamSqlRow> inputTable = PBegin.in(p).apply(Create.of(row)
+        .withCoder(new BeamSqlRowCoder(type)));
+
+    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
+    PCollection<BeamSqlRow> outputStream = inputTable.apply(
+        BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+
+    //print the output record of case 1;
+    outputStream.apply("log_result",
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+      public Void apply(BeamSqlRow input) {
+        System.out.println("PCOLLECTION: " + input);
+        return null;
+      }
+    }));
+
+    //Case 2. run the query with BeamSql.query over result PCollection of case 1.
+    PCollection<BeamSqlRow> outputStream2 =
+        PCollectionTuple.of(new TupleTag<BeamSqlRow>("CASE1_RESULT"), outputStream)
+        .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+
+    //print the output record of case 2;
+    outputStream2.apply("log_result",
+        MapElements.<BeamSqlRow, Void>via(new SimpleFunction<BeamSqlRow, Void>() {
+      @Override
+      public Void apply(BeamSqlRow input) {
+        System.out.println("TABLE_B: " + input);
+        return null;
+      }
+    }));
+
+    p.run().waitUntilFinish();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
new file mode 100644
index 0000000..f156917
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * examples on how to use BeamSQL.
+ *
+ */
+package org.apache.beam.sdk.extensions.sql.example;

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..28f83e4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+  /**
+   * invoked before data processing.
+   */
+  void prepare();
+
+  /**
+   * apply transformation to input record {@link BeamSqlRow}.
+   *
+   */
+  List<Object> execute(BeamSqlRow inputRow);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..3084cd5
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+  protected List<BeamSqlExpression> exps;
+
+  public BeamSqlFnExecutor(BeamRelNode relNode) {
+    this.exps = new ArrayList<>();
+    if (relNode instanceof BeamFilterRel) {
+      BeamFilterRel filterNode = (BeamFilterRel) relNode;
+      RexNode condition = filterNode.getCondition();
+      exps.add(buildExpression(condition));
+    } else if (relNode instanceof BeamProjectRel) {
+      BeamProjectRel projectNode = (BeamProjectRel) relNode;
+      List<RexNode> projects = projectNode.getProjects();
+      for (RexNode rexNode : projects) {
+        exps.add(buildExpression(rexNode));
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", relNode.getClass().toString()));
+    }
+  }
+
+  /**
+   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+   */
+  static BeamSqlExpression buildExpression(RexNode rexNode) {
+    BeamSqlExpression ret = null;
+    if (rexNode instanceof RexLiteral) {
+      RexLiteral node = (RexLiteral) rexNode;
+      SqlTypeName type = node.getTypeName();
+      Object value = node.getValue();
+
+      if (SqlTypeName.CHAR_TYPES.contains(type)
+          && node.getValue() instanceof NlsString) {
+        // NlsString is not serializable, we need to convert
+        // it to string explicitly.
+        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+        // does this actually make sense?
+        // Calcite actually treat Calendar as the java type of Date Literal
+        return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+      } else {
+        // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+        // e.g. sql: "select 1"
+        // here the literal 1 will be parsed as a RexLiteral where:
+        //     node.getType().getSqlTypeName() = INTEGER (the display type)
+        //     node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+        // So we need to do a convert here.
+        // check RexBuilder#makeLiteral for more information.
+        SqlTypeName realType = node.getType().getSqlTypeName();
+        Object realValue = value;
+        if (type == SqlTypeName.DECIMAL) {
+          BigDecimal rawValue = (BigDecimal) value;
+          switch (realType) {
+            case TINYINT:
+              realValue = (byte) rawValue.intValue();
+              break;
+            case SMALLINT:
+              realValue = (short) rawValue.intValue();
+              break;
+            case INTEGER:
+              realValue = rawValue.intValue();
+              break;
+            case BIGINT:
+              realValue = rawValue.longValue();
+              break;
+            case DECIMAL:
+              realValue = rawValue;
+              break;
+            default:
+              throw new IllegalStateException("type/realType mismatch: "
+                  + type + " VS " + realType);
+          }
+        } else if (type == SqlTypeName.DOUBLE) {
+          Double rawValue = (Double) value;
+          if (realType == SqlTypeName.FLOAT) {
+            realValue = rawValue.floatValue();
+          }
+        }
+        return BeamSqlPrimitive.of(realType, realValue);
+      }
+    } else if (rexNode instanceof RexInputRef) {
+      RexInputRef node = (RexInputRef) rexNode;
+      ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+    } else if (rexNode instanceof RexCall) {
+      RexCall node = (RexCall) rexNode;
+      String opName = node.op.getName();
+      List<BeamSqlExpression> subExps = new ArrayList<>();
+      for (RexNode subNode : node.getOperands()) {
+        subExps.add(buildExpression(subNode));
+      }
+      switch (opName) {
+        // logical operators
+        case "AND":
+          ret = new BeamSqlAndExpression(subExps);
+          break;
+        case "OR":
+          ret = new BeamSqlOrExpression(subExps);
+          break;
+        case "NOT":
+          ret = new BeamSqlNotExpression(subExps);
+          break;
+        case "=":
+          ret = new BeamSqlEqualsExpression(subExps);
+          break;
+        case "<>":
+          ret = new BeamSqlNotEqualsExpression(subExps);
+          break;
+        case ">":
+          ret = new BeamSqlGreaterThanExpression(subExps);
+          break;
+        case ">=":
+          ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+          break;
+        case "<":
+          ret = new BeamSqlLessThanExpression(subExps);
+          break;
+        case "<=":
+          ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+          break;
+
+        // arithmetic operators
+        case "+":
+          ret = new BeamSqlPlusExpression(subExps);
+          break;
+        case "-":
+          ret = new BeamSqlMinusExpression(subExps);
+          break;
+        case "*":
+          ret = new BeamSqlMultiplyExpression(subExps);
+          break;
+        case "/":
+        case "/INT":
+          ret = new BeamSqlDivideExpression(subExps);
+          break;
+        case "MOD":
+          ret = new BeamSqlModExpression(subExps);
+          break;
+
+        case "ABS":
+          ret = new BeamSqlAbsExpression(subExps);
+          break;
+        case "ROUND":
+          ret = new BeamSqlRoundExpression(subExps);
+          break;
+        case "LN":
+          ret = new BeamSqlLnExpression(subExps);
+          break;
+        case "LOG10":
+          ret = new BeamSqlLogExpression(subExps);
+          break;
+        case "EXP":
+          ret = new BeamSqlExpExpression(subExps);
+          break;
+        case "ACOS":
+          ret = new BeamSqlAcosExpression(subExps);
+          break;
+        case "ASIN":
+          ret = new BeamSqlAsinExpression(subExps);
+          break;
+        case "ATAN":
+          ret = new BeamSqlAtanExpression(subExps);
+          break;
+        case "COT":
+          ret = new BeamSqlCotExpression(subExps);
+          break;
+        case "DEGREES":
+          ret = new BeamSqlDegreesExpression(subExps);
+          break;
+        case "RADIANS":
+          ret = new BeamSqlRadiansExpression(subExps);
+          break;
+        case "COS":
+          ret = new BeamSqlCosExpression(subExps);
+          break;
+        case "SIN":
+          ret = new BeamSqlSinExpression(subExps);
+          break;
+        case "TAN":
+          ret = new BeamSqlTanExpression(subExps);
+          break;
+        case "SIGN":
+          ret = new BeamSqlSignExpression(subExps);
+          break;
+        case "POWER":
+          ret = new BeamSqlPowerExpression(subExps);
+          break;
+        case "PI":
+          ret = new BeamSqlPiExpression();
+          break;
+        case "ATAN2":
+          ret = new BeamSqlAtan2Expression(subExps);
+          break;
+        case "TRUNCATE":
+          ret = new BeamSqlTruncateExpression(subExps);
+          break;
+        case "RAND":
+          ret = new BeamSqlRandExpression(subExps);
+          break;
+        case "RAND_INTEGER":
+          ret = new BeamSqlRandIntegerExpression(subExps);
+          break;
+
+        // string operators
+        case "||":
+          ret = new BeamSqlConcatExpression(subExps);
+          break;
+        case "POSITION":
+          ret = new BeamSqlPositionExpression(subExps);
+          break;
+        case "CHAR_LENGTH":
+        case "CHARACTER_LENGTH":
+          ret = new BeamSqlCharLengthExpression(subExps);
+          break;
+        case "UPPER":
+          ret = new BeamSqlUpperExpression(subExps);
+          break;
+        case "LOWER":
+          ret = new BeamSqlLowerExpression(subExps);
+          break;
+        case "TRIM":
+          ret = new BeamSqlTrimExpression(subExps);
+          break;
+        case "SUBSTRING":
+          ret = new BeamSqlSubstringExpression(subExps);
+          break;
+        case "OVERLAY":
+          ret = new BeamSqlOverlayExpression(subExps);
+          break;
+        case "INITCAP":
+          ret = new BeamSqlInitCapExpression(subExps);
+          break;
+
+        // date functions
+        case "Reinterpret":
+          return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+        case "CEIL":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlCeilExpression(subExps);
+          } else {
+            return new BeamSqlDateCeilExpression(subExps);
+          }
+        case "FLOOR":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlFloorExpression(subExps);
+          } else {
+            return new BeamSqlDateFloorExpression(subExps);
+          }
+        case "EXTRACT_DATE":
+        case "EXTRACT":
+          return new BeamSqlExtractExpression(subExps);
+
+        case "LOCALTIME":
+        case "CURRENT_TIME":
+          return new BeamSqlCurrentTimeExpression(subExps);
+
+        case "CURRENT_TIMESTAMP":
+        case "LOCALTIMESTAMP":
+          return new BeamSqlCurrentTimestampExpression(subExps);
+
+        case "CURRENT_DATE":
+          return new BeamSqlCurrentDateExpression();
+
+
+        case "CASE":
+          ret = new BeamSqlCaseExpression(subExps);
+          break;
+        case "CAST":
+          ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+          break;
+
+        case "IS NULL":
+          ret = new BeamSqlIsNullExpression(subExps.get(0));
+          break;
+        case "IS NOT NULL":
+          ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+          break;
+
+        case "HOP":
+        case "TUMBLE":
+        case "SESSION":
+          ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+          break;
+        case "HOP_START":
+        case "TUMBLE_START":
+        case "SESSION_START":
+          ret = new BeamSqlWindowStartExpression();
+          break;
+        case "HOP_END":
+        case "TUMBLE_END":
+        case "SESSION_END":
+          ret = new BeamSqlWindowEndExpression();
+          break;
+        default:
+          //handle UDF
+          if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+            SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+            ret = new BeamSqlUdfExpression(fn.method, subExps,
+              ((RexCall) rexNode).type.getSqlTypeName());
+        } else {
+          throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+        }
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", rexNode.getClass().toString()));
+    }
+
+    if (ret != null && !ret.accept()) {
+      throw new IllegalStateException(ret.getClass().getSimpleName()
+          + " does not accept the operands.(" + rexNode + ")");
+    }
+
+    return ret;
+  }
+
+  @Override
+  public void prepare() {
+  }
+
+  @Override
+  public List<Object> execute(BeamSqlRow inputRow) {
+    List<Object> results = new ArrayList<>();
+    for (BeamSqlExpression exp : exps) {
+      results.add(exp.evaluate(inputRow).getValue());
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..bfbb33e
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ *  {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+  public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+    // the return type of CASE is the type of the `else` condition
+    super(operands, operands.get(operands.size() - 1).getOutputType());
+  }
+
+  @Override public boolean accept() {
+    // `when`-`then` pair + `else`
+    if (operands.size() % 2 != 1) {
+      return false;
+    }
+
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opType(i) != SqlTypeName.BOOLEAN) {
+        return false;
+      } else if (opType(i + 1) != outputType) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opValueEvaluated(i, inputRow)) {
+        return BeamSqlPrimitive.of(
+            outputType,
+            opValueEvaluated(i + 1, inputRow)
+        );
+      }
+    }
+    return BeamSqlPrimitive.of(outputType,
+        opValueEvaluated(operands.size() - 1, inputRow));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..08abcc6
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+  private static final int index = 0;
+  private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+  private static final String outputDateFormat = "yyyy-MM-dd";
+  /**
+   * Date and Timestamp formats used to parse
+   * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+   */
+  private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+      .append(null/*printer*/, new DateTimeParser[] {
+          // date formats
+          DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+          DateTimeFormat.forPattern("yyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+          // datetime formats
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+      .withPivotYear(2020);
+
+  public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+    super(operands, castType);
+  }
+
+  @Override
+  public boolean accept() {
+    return numberOfOperands() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    SqlTypeName castOutputType = getOutputType();
+    switch (castOutputType) {
+      case INTEGER:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+      case DOUBLE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+      case SMALLINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+      case TINYINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+      case BIGINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+      case FLOAT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+      case CHAR:
+      case VARCHAR:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+      case DATE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+      case TIMESTAMP:
+        return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+    }
+    throw new UnsupportedOperationException(
+        String.format("Cast to type %s not supported", castOutputType));
+  }
+
+  private Date toDate(Object inputDate, String outputFormat) {
+    try {
+      return Date
+          .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+    }
+  }
+
+  private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+    try {
+      return Timestamp.valueOf(
+          dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+              .roundCeilingCopy().toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..cb8baac
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable {
+  protected List<BeamSqlExpression> operands;
+  protected SqlTypeName outputType;
+
+  protected BeamSqlExpression(){}
+
+  public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    this.operands = operands;
+    this.outputType = outputType;
+  }
+
+  public BeamSqlExpression op(int idx) {
+    return operands.get(idx);
+  }
+
+  public SqlTypeName opType(int idx) {
+    return op(idx).getOutputType();
+  }
+
+  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+    return (T) op(idx).evaluate(row).getValue();
+  }
+
+  /**
+   * assertion to make sure the input and output are supported in this expression.
+   */
+  public abstract boolean accept();
+
+  /**
+   * Apply input record {@link BeamSqlRow} to this expression,
+   * the output value is wrapped with {@link BeamSqlPrimitive}.
+   */
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+
+  public List<BeamSqlExpression> getOperands() {
+    return operands;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public int numberOfOperands() {
+    return operands.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..7ba4a46
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
+  private int inputRef;
+
+  public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+    super(null, sqlTypeName);
+    this.inputRef = inputRef;
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..6a8216b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
+  private T value;
+
+  private BeamSqlPrimitive() {
+  }
+
+  private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  /**
+   * A builder function to create from Type and value directly.
+   */
+  public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
+    exp.outputType = outputType;
+    exp.value = value;
+    if (!exp.accept()) {
+      throw new IllegalArgumentException(
+          String.format("value [%s] doesn't match type [%s].", value, outputType));
+    }
+    return exp;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  public long getLong() {
+    return (Long) getValue();
+  }
+
+  public double getDouble() {
+    return (Double) getValue();
+  }
+
+  public float getFloat() {
+    return (Float) getValue();
+  }
+
+  public int getInteger() {
+    return (Integer) getValue();
+  }
+
+  public short getShort() {
+    return (Short) getValue();
+  }
+
+  public byte getByte() {
+    return (Byte) getValue();
+  }
+  public boolean getBoolean() {
+    return (Boolean) getValue();
+  }
+
+  public String getString() {
+    return (String) getValue();
+  }
+
+  public Date getDate() {
+    return (Date) getValue();
+  }
+
+  public BigDecimal getDecimal() {
+    return (BigDecimal) getValue();
+  }
+
+  @Override
+  public boolean accept() {
+    if (value == null) {
+      return true;
+    }
+
+    switch (outputType) {
+    case BIGINT:
+      return value instanceof Long;
+    case DECIMAL:
+      return value instanceof BigDecimal;
+    case DOUBLE:
+      return value instanceof Double;
+    case FLOAT:
+      return value instanceof Float;
+    case INTEGER:
+      return value instanceof Integer;
+    case SMALLINT:
+      return value instanceof Short;
+    case TINYINT:
+      return value instanceof Byte;
+    case BOOLEAN:
+      return value instanceof Boolean;
+    case CHAR:
+    case VARCHAR:
+      return value instanceof String || value instanceof NlsString;
+    case TIME:
+      return value instanceof GregorianCalendar;
+    case TIMESTAMP:
+    case DATE:
+      return value instanceof Date;
+    case INTERVAL_HOUR:
+      return value instanceof BigDecimal;
+    case INTERVAL_MINUTE:
+      return value instanceof BigDecimal;
+    case SYMBOL:
+      // for SYMBOL, it supports anything...
+      return true;
+    default:
+      throw new UnsupportedOperationException(outputType.name());
+    }
+  }
+
+  @Override
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..7b4894a
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return getOperands().size() == 1
+        && outputType == SqlTypeName.BIGINT
+        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (opType(0) == SqlTypeName.TIME) {
+      GregorianCalendar date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+    } else {
+      Date date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTime());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..42e511d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * invoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+  //as Method is not Serializable, need to keep class/method information, and rebuild it.
+  private transient Method method;
+  private String className;
+  private String methodName;
+  private List<String> paraClassName = new ArrayList<>();
+
+  public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+      SqlTypeName sqlTypeName) {
+    super(subExps, sqlTypeName);
+    this.method = method;
+
+    this.className = method.getDeclaringClass().getName();
+    this.methodName = method.getName();
+    for (Class<?> c : method.getParameterTypes()) {
+      paraClassName.add(c.getName());
+    }
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (method == null) {
+      reConstructMethod();
+    }
+    try {
+      List<Object> paras = new ArrayList<>();
+      for (BeamSqlExpression e : getOperands()) {
+        paras.add(e.evaluate(inputRow).getValue());
+      }
+
+      return BeamSqlPrimitive.of(getOutputType(),
+          method.invoke(null, paras.toArray(new Object[]{})));
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * re-construct method from class/method.
+   */
+  private void reConstructMethod() {
+    try {
+      List<Class<?>> paraClass = new ArrayList<>();
+      for (String pc : paraClassName) {
+        paraClass.add(Class.forName(pc));
+      }
+      method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..76f602c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowEnd().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..21ec6dc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        (Date) operands.get(0).evaluate(inputRow).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..a38fd12
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowStart().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..67a35fc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+  private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+  static {
+    ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+    super(operands, deduceOutputType(operands.get(0).getOutputType(),
+        operands.get(1).getOutputType()));
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+    BigDecimal left = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+    BigDecimal right = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+
+    BigDecimal result = calc(left, right);
+    return getCorrectlyTypedResult(result);
+  }
+
+  protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+  protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+    int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+    int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+    if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+        && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+      return SqlTypeName.DOUBLE;
+    }
+
+    if (leftIndex < rightIndex) {
+      return right;
+    } else if (leftIndex > rightIndex) {
+      return left;
+    } else {
+      return left;
+    }
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression operand : operands) {
+      if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+    Number actualValue;
+    switch (outputType) {
+      case TINYINT:
+        actualValue = rawResult.byteValue();
+        break;
+      case SMALLINT:
+        actualValue = rawResult.shortValue();
+        break;
+      case INTEGER:
+        actualValue = rawResult.intValue();
+        break;
+      case BIGINT:
+        actualValue = rawResult.longValue();
+        break;
+      case FLOAT:
+        actualValue = rawResult.floatValue();
+        break;
+      case DOUBLE:
+        actualValue = rawResult.doubleValue();
+        break;
+      case DECIMAL:
+      default:
+        actualValue = rawResult;
+    }
+    return BeamSqlPrimitive.of(outputType, actualValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..fbe3fc4
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.divide(right, 10, RoundingMode.HALF_EVEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b5482d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
new file mode 100644
index 0000000..0241574
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/interpreter/operator/arithmetic/BeamSqlMinusExpression.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '-' operator.
+ */
+public class BeamSqlMinusExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlMinusExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.subtract(right);
+  }
+}