You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/08/02 05:09:21 UTC

[58/59] beam git commit: move all implementation classes/packages into impl package

move all implementation classes/packages into impl package


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/febd044a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/febd044a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/febd044a

Branch: refs/heads/DSL_SQL
Commit: febd044ae306a28fa3797a1663e54c1d7fbe43ce
Parents: c1b5482
Author: James Xu <xu...@gmail.com>
Authored: Mon Jul 31 17:11:53 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Jul 31 17:11:53 2017 +0800

----------------------------------------------------------------------
 .../apache/beam/sdk/extensions/sql/BeamSql.java |   2 +-
 .../beam/sdk/extensions/sql/BeamSqlCli.java     |   2 +-
 .../beam/sdk/extensions/sql/BeamSqlEnv.java     |   4 +-
 .../interpreter/BeamSqlExpressionExecutor.java  |  43 ++
 .../sql/impl/interpreter/BeamSqlFnExecutor.java | 442 +++++++++++++++++++
 .../operator/BeamSqlCaseExpression.java         |  63 +++
 .../operator/BeamSqlCastExpression.java         | 131 ++++++
 .../interpreter/operator/BeamSqlExpression.java |  78 ++++
 .../operator/BeamSqlInputRefExpression.java     |  43 ++
 .../interpreter/operator/BeamSqlPrimitive.java  | 152 +++++++
 .../operator/BeamSqlReinterpretExpression.java  |  54 +++
 .../operator/BeamSqlUdfExpression.java          |  86 ++++
 .../operator/BeamSqlWindowEndExpression.java    |  42 ++
 .../operator/BeamSqlWindowExpression.java       |  50 +++
 .../operator/BeamSqlWindowStartExpression.java  |  43 ++
 .../arithmetic/BeamSqlArithmeticExpression.java | 122 +++++
 .../arithmetic/BeamSqlDivideExpression.java     |  37 ++
 .../arithmetic/BeamSqlMinusExpression.java      |  36 ++
 .../arithmetic/BeamSqlModExpression.java        |  36 ++
 .../arithmetic/BeamSqlMultiplyExpression.java   |  36 ++
 .../arithmetic/BeamSqlPlusExpression.java       |  36 ++
 .../operator/arithmetic/package-info.java       |  22 +
 .../comparison/BeamSqlCompareExpression.java    |  96 ++++
 .../comparison/BeamSqlEqualsExpression.java     |  49 ++
 .../BeamSqlGreaterThanExpression.java           |  49 ++
 .../BeamSqlGreaterThanOrEqualsExpression.java   |  49 ++
 .../comparison/BeamSqlIsNotNullExpression.java  |  53 +++
 .../comparison/BeamSqlIsNullExpression.java     |  53 +++
 .../comparison/BeamSqlLessThanExpression.java   |  49 ++
 .../BeamSqlLessThanOrEqualsExpression.java      |  49 ++
 .../comparison/BeamSqlNotEqualsExpression.java  |  49 ++
 .../operator/comparison/package-info.java       |  22 +
 .../date/BeamSqlCurrentDateExpression.java      |  44 ++
 .../date/BeamSqlCurrentTimeExpression.java      |  52 +++
 .../date/BeamSqlCurrentTimestampExpression.java |  48 ++
 .../date/BeamSqlDateCeilExpression.java         |  54 +++
 .../date/BeamSqlDateFloorExpression.java        |  54 +++
 .../operator/date/BeamSqlExtractExpression.java | 101 +++++
 .../interpreter/operator/date/package-info.java |  22 +
 .../operator/logical/BeamSqlAndExpression.java  |  47 ++
 .../logical/BeamSqlLogicalExpression.java       |  46 ++
 .../operator/logical/BeamSqlNotExpression.java  |  53 +++
 .../operator/logical/BeamSqlOrExpression.java   |  47 ++
 .../operator/logical/package-info.java          |  22 +
 .../operator/math/BeamSqlAbsExpression.java     |  74 ++++
 .../operator/math/BeamSqlAcosExpression.java    |  40 ++
 .../operator/math/BeamSqlAsinExpression.java    |  40 ++
 .../operator/math/BeamSqlAtan2Expression.java   |  42 ++
 .../operator/math/BeamSqlAtanExpression.java    |  40 ++
 .../operator/math/BeamSqlCeilExpression.java    |  45 ++
 .../operator/math/BeamSqlCosExpression.java     |  40 ++
 .../operator/math/BeamSqlCotExpression.java     |  40 ++
 .../operator/math/BeamSqlDegreesExpression.java |  40 ++
 .../operator/math/BeamSqlExpExpression.java     |  40 ++
 .../operator/math/BeamSqlFloorExpression.java   |  45 ++
 .../operator/math/BeamSqlLnExpression.java      |  40 ++
 .../operator/math/BeamSqlLogExpression.java     |  40 ++
 .../math/BeamSqlMathBinaryExpression.java       |  63 +++
 .../math/BeamSqlMathUnaryExpression.java        |  58 +++
 .../operator/math/BeamSqlPiExpression.java      |  42 ++
 .../operator/math/BeamSqlPowerExpression.java   |  44 ++
 .../operator/math/BeamSqlRadiansExpression.java |  40 ++
 .../operator/math/BeamSqlRandExpression.java    |  54 +++
 .../math/BeamSqlRandIntegerExpression.java      |  58 +++
 .../operator/math/BeamSqlRoundExpression.java   | 107 +++++
 .../operator/math/BeamSqlSignExpression.java    |  72 +++
 .../operator/math/BeamSqlSinExpression.java     |  40 ++
 .../operator/math/BeamSqlTanExpression.java     |  40 ++
 .../math/BeamSqlTruncateExpression.java         |  75 ++++
 .../interpreter/operator/math/package-info.java |  22 +
 .../impl/interpreter/operator/package-info.java |  22 +
 .../string/BeamSqlCharLengthExpression.java     |  39 ++
 .../string/BeamSqlConcatExpression.java         |  62 +++
 .../string/BeamSqlInitCapExpression.java        |  55 +++
 .../operator/string/BeamSqlLowerExpression.java |  39 ++
 .../string/BeamSqlOverlayExpression.java        |  76 ++++
 .../string/BeamSqlPositionExpression.java       |  72 +++
 .../string/BeamSqlStringUnaryExpression.java    |  44 ++
 .../string/BeamSqlSubstringExpression.java      |  82 ++++
 .../operator/string/BeamSqlTrimExpression.java  | 101 +++++
 .../operator/string/BeamSqlUpperExpression.java |  39 ++
 .../operator/string/package-info.java           |  22 +
 .../sql/impl/interpreter/package-info.java      |  22 +
 .../sql/impl/planner/BeamQueryPlanner.java      | 167 +++++++
 .../sql/impl/planner/BeamRelDataTypeSystem.java |  40 ++
 .../sql/impl/planner/BeamRuleSets.java          |  75 ++++
 .../sql/impl/planner/package-info.java          |  24 +
 .../sql/impl/rel/BeamAggregationRel.java        | 182 ++++++++
 .../extensions/sql/impl/rel/BeamFilterRel.java  |  70 +++
 .../extensions/sql/impl/rel/BeamIOSinkRel.java  |  75 ++++
 .../sql/impl/rel/BeamIOSourceRel.java           |  63 +++
 .../sql/impl/rel/BeamIntersectRel.java          |  58 +++
 .../extensions/sql/impl/rel/BeamJoinRel.java    | 302 +++++++++++++
 .../sql/impl/rel/BeamLogicalConvention.java     |  72 +++
 .../extensions/sql/impl/rel/BeamMinusRel.java   |  56 +++
 .../extensions/sql/impl/rel/BeamProjectRel.java |  81 ++++
 .../extensions/sql/impl/rel/BeamRelNode.java    |  38 ++
 .../sql/impl/rel/BeamSetOperatorRelBase.java    |  98 ++++
 .../extensions/sql/impl/rel/BeamSortRel.java    | 247 +++++++++++
 .../sql/impl/rel/BeamSqlRelUtils.java           |  72 +++
 .../extensions/sql/impl/rel/BeamUnionRel.java   |  88 ++++
 .../extensions/sql/impl/rel/BeamValuesRel.java  |  79 ++++
 .../extensions/sql/impl/rel/package-info.java   |  23 +
 .../sql/impl/rule/BeamAggregationRule.java      | 162 +++++++
 .../sql/impl/rule/BeamFilterRule.java           |  49 ++
 .../sql/impl/rule/BeamIOSinkRule.java           |  81 ++++
 .../sql/impl/rule/BeamIOSourceRule.java         |  49 ++
 .../sql/impl/rule/BeamIntersectRule.java        |  50 +++
 .../extensions/sql/impl/rule/BeamJoinRule.java  |  53 +++
 .../extensions/sql/impl/rule/BeamMinusRule.java |  50 +++
 .../sql/impl/rule/BeamProjectRule.java          |  50 +++
 .../extensions/sql/impl/rule/BeamSortRule.java  |  51 +++
 .../extensions/sql/impl/rule/BeamUnionRule.java |  50 +++
 .../sql/impl/rule/BeamValuesRule.java           |  48 ++
 .../extensions/sql/impl/rule/package-info.java  |  23 +
 .../transform/BeamAggregationTransforms.java    | 300 +++++++++++++
 .../impl/transform/BeamBuiltinAggregations.java | 412 +++++++++++++++++
 .../sql/impl/transform/BeamJoinTransforms.java  | 166 +++++++
 .../transform/BeamSetOperatorsTransforms.java   | 111 +++++
 .../sql/impl/transform/BeamSqlFilterFn.java     |  62 +++
 .../transform/BeamSqlOutputToConsoleFn.java     |  41 ++
 .../sql/impl/transform/BeamSqlProjectFn.java    |  72 +++
 .../sql/impl/transform/package-info.java        |  22 +
 .../extensions/sql/impl/utils/CalciteUtils.java | 113 +++++
 .../extensions/sql/impl/utils/package-info.java |  22 +
 .../interpreter/BeamSqlExpressionExecutor.java  |  43 --
 .../sql/interpreter/BeamSqlFnExecutor.java      | 442 -------------------
 .../operator/BeamSqlCaseExpression.java         |  63 ---
 .../operator/BeamSqlCastExpression.java         | 131 ------
 .../interpreter/operator/BeamSqlExpression.java |  78 ----
 .../operator/BeamSqlInputRefExpression.java     |  43 --
 .../interpreter/operator/BeamSqlPrimitive.java  | 152 -------
 .../operator/BeamSqlReinterpretExpression.java  |  54 ---
 .../operator/BeamSqlUdfExpression.java          |  86 ----
 .../operator/BeamSqlWindowEndExpression.java    |  42 --
 .../operator/BeamSqlWindowExpression.java       |  50 ---
 .../operator/BeamSqlWindowStartExpression.java  |  43 --
 .../arithmetic/BeamSqlArithmeticExpression.java | 122 -----
 .../arithmetic/BeamSqlDivideExpression.java     |  37 --
 .../arithmetic/BeamSqlMinusExpression.java      |  36 --
 .../arithmetic/BeamSqlModExpression.java        |  36 --
 .../arithmetic/BeamSqlMultiplyExpression.java   |  36 --
 .../arithmetic/BeamSqlPlusExpression.java       |  36 --
 .../operator/arithmetic/package-info.java       |  22 -
 .../comparison/BeamSqlCompareExpression.java    |  96 ----
 .../comparison/BeamSqlEqualsExpression.java     |  49 --
 .../BeamSqlGreaterThanExpression.java           |  49 --
 .../BeamSqlGreaterThanOrEqualsExpression.java   |  49 --
 .../comparison/BeamSqlIsNotNullExpression.java  |  53 ---
 .../comparison/BeamSqlIsNullExpression.java     |  53 ---
 .../comparison/BeamSqlLessThanExpression.java   |  49 --
 .../BeamSqlLessThanOrEqualsExpression.java      |  49 --
 .../comparison/BeamSqlNotEqualsExpression.java  |  49 --
 .../operator/comparison/package-info.java       |  22 -
 .../date/BeamSqlCurrentDateExpression.java      |  44 --
 .../date/BeamSqlCurrentTimeExpression.java      |  52 ---
 .../date/BeamSqlCurrentTimestampExpression.java |  48 --
 .../date/BeamSqlDateCeilExpression.java         |  54 ---
 .../date/BeamSqlDateFloorExpression.java        |  54 ---
 .../operator/date/BeamSqlExtractExpression.java | 101 -----
 .../interpreter/operator/date/package-info.java |  22 -
 .../operator/logical/BeamSqlAndExpression.java  |  47 --
 .../logical/BeamSqlLogicalExpression.java       |  46 --
 .../operator/logical/BeamSqlNotExpression.java  |  53 ---
 .../operator/logical/BeamSqlOrExpression.java   |  47 --
 .../operator/logical/package-info.java          |  22 -
 .../operator/math/BeamSqlAbsExpression.java     |  74 ----
 .../operator/math/BeamSqlAcosExpression.java    |  40 --
 .../operator/math/BeamSqlAsinExpression.java    |  40 --
 .../operator/math/BeamSqlAtan2Expression.java   |  42 --
 .../operator/math/BeamSqlAtanExpression.java    |  40 --
 .../operator/math/BeamSqlCeilExpression.java    |  45 --
 .../operator/math/BeamSqlCosExpression.java     |  40 --
 .../operator/math/BeamSqlCotExpression.java     |  40 --
 .../operator/math/BeamSqlDegreesExpression.java |  40 --
 .../operator/math/BeamSqlExpExpression.java     |  40 --
 .../operator/math/BeamSqlFloorExpression.java   |  45 --
 .../operator/math/BeamSqlLnExpression.java      |  40 --
 .../operator/math/BeamSqlLogExpression.java     |  40 --
 .../math/BeamSqlMathBinaryExpression.java       |  63 ---
 .../math/BeamSqlMathUnaryExpression.java        |  58 ---
 .../operator/math/BeamSqlPiExpression.java      |  42 --
 .../operator/math/BeamSqlPowerExpression.java   |  44 --
 .../operator/math/BeamSqlRadiansExpression.java |  40 --
 .../operator/math/BeamSqlRandExpression.java    |  54 ---
 .../math/BeamSqlRandIntegerExpression.java      |  58 ---
 .../operator/math/BeamSqlRoundExpression.java   | 107 -----
 .../operator/math/BeamSqlSignExpression.java    |  72 ---
 .../operator/math/BeamSqlSinExpression.java     |  40 --
 .../operator/math/BeamSqlTanExpression.java     |  40 --
 .../math/BeamSqlTruncateExpression.java         |  75 ----
 .../interpreter/operator/math/package-info.java |  22 -
 .../sql/interpreter/operator/package-info.java  |  22 -
 .../string/BeamSqlCharLengthExpression.java     |  39 --
 .../string/BeamSqlConcatExpression.java         |  62 ---
 .../string/BeamSqlInitCapExpression.java        |  55 ---
 .../operator/string/BeamSqlLowerExpression.java |  39 --
 .../string/BeamSqlOverlayExpression.java        |  76 ----
 .../string/BeamSqlPositionExpression.java       |  72 ---
 .../string/BeamSqlStringUnaryExpression.java    |  44 --
 .../string/BeamSqlSubstringExpression.java      |  82 ----
 .../operator/string/BeamSqlTrimExpression.java  | 101 -----
 .../operator/string/BeamSqlUpperExpression.java |  39 --
 .../operator/string/package-info.java           |  22 -
 .../sql/interpreter/package-info.java           |  22 -
 .../sql/planner/BeamQueryPlanner.java           | 167 -------
 .../sql/planner/BeamRelDataTypeSystem.java      |  40 --
 .../extensions/sql/planner/BeamRuleSets.java    |  75 ----
 .../extensions/sql/planner/package-info.java    |  24 -
 .../extensions/sql/rel/BeamAggregationRel.java  | 182 --------
 .../sdk/extensions/sql/rel/BeamFilterRel.java   |  70 ---
 .../sdk/extensions/sql/rel/BeamIOSinkRel.java   |  75 ----
 .../sdk/extensions/sql/rel/BeamIOSourceRel.java |  63 ---
 .../extensions/sql/rel/BeamIntersectRel.java    |  58 ---
 .../sdk/extensions/sql/rel/BeamJoinRel.java     | 302 -------------
 .../sql/rel/BeamLogicalConvention.java          |  72 ---
 .../sdk/extensions/sql/rel/BeamMinusRel.java    |  56 ---
 .../sdk/extensions/sql/rel/BeamProjectRel.java  |  81 ----
 .../sdk/extensions/sql/rel/BeamRelNode.java     |  38 --
 .../sql/rel/BeamSetOperatorRelBase.java         |  98 ----
 .../sdk/extensions/sql/rel/BeamSortRel.java     | 247 -----------
 .../sdk/extensions/sql/rel/BeamSqlRelUtils.java |  72 ---
 .../sdk/extensions/sql/rel/BeamUnionRel.java    |  88 ----
 .../sdk/extensions/sql/rel/BeamValuesRel.java   |  79 ----
 .../sdk/extensions/sql/rel/package-info.java    |  23 -
 .../sql/rule/BeamAggregationRule.java           | 162 -------
 .../sdk/extensions/sql/rule/BeamFilterRule.java |  49 --
 .../sdk/extensions/sql/rule/BeamIOSinkRule.java |  81 ----
 .../extensions/sql/rule/BeamIOSourceRule.java   |  49 --
 .../extensions/sql/rule/BeamIntersectRule.java  |  50 ---
 .../sdk/extensions/sql/rule/BeamJoinRule.java   |  53 ---
 .../sdk/extensions/sql/rule/BeamMinusRule.java  |  50 ---
 .../extensions/sql/rule/BeamProjectRule.java    |  50 ---
 .../sdk/extensions/sql/rule/BeamSortRule.java   |  51 ---
 .../sdk/extensions/sql/rule/BeamUnionRule.java  |  50 ---
 .../sdk/extensions/sql/rule/BeamValuesRule.java |  48 --
 .../sdk/extensions/sql/rule/package-info.java   |  23 -
 .../sdk/extensions/sql/schema/BeamSqlRow.java   |   2 +-
 .../extensions/sql/schema/BeamSqlRowCoder.java  |   2 +-
 .../extensions/sql/schema/BeamTableUtils.java   |   2 +-
 .../transform/BeamAggregationTransforms.java    | 300 -------------
 .../sql/transform/BeamBuiltinAggregations.java  | 412 -----------------
 .../sql/transform/BeamJoinTransforms.java       | 166 -------
 .../transform/BeamSetOperatorsTransforms.java   | 111 -----
 .../sql/transform/BeamSqlFilterFn.java          |  62 ---
 .../sql/transform/BeamSqlOutputToConsoleFn.java |  41 --
 .../sql/transform/BeamSqlProjectFn.java         |  72 ---
 .../extensions/sql/transform/package-info.java  |  22 -
 .../sdk/extensions/sql/utils/CalciteUtils.java  | 113 -----
 .../sdk/extensions/sql/utils/package-info.java  |  22 -
 .../sdk/extensions/sql/BeamSqlDslJoinTest.java  |   4 +-
 .../impl/interpreter/BeamSqlFnExecutorTest.java | 416 +++++++++++++++++
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  92 ++++
 .../operator/BeamNullExperssionTest.java        |  55 +++
 .../operator/BeamSqlAndOrExpressionTest.java    |  61 +++
 .../operator/BeamSqlCaseExpressionTest.java     |  93 ++++
 .../operator/BeamSqlCastExpressionTest.java     | 125 ++++++
 .../operator/BeamSqlCompareExpressionTest.java  | 115 +++++
 .../operator/BeamSqlInputRefExpressionTest.java |  57 +++
 .../operator/BeamSqlPrimitiveTest.java          |  59 +++
 .../BeamSqlReinterpretExpressionTest.java       |  75 ++++
 .../operator/BeamSqlUdfExpressionTest.java      |  51 +++
 .../BeamSqlArithmeticExpressionTest.java        | 237 ++++++++++
 .../date/BeamSqlCurrentDateExpressionTest.java  |  38 ++
 .../date/BeamSqlCurrentTimeExpressionTest.java  |  39 ++
 .../BeamSqlCurrentTimestampExpressionTest.java  |  39 ++
 .../date/BeamSqlDateCeilExpressionTest.java     |  50 +++
 .../date/BeamSqlDateExpressionTestBase.java     |  51 +++
 .../date/BeamSqlDateFloorExpressionTest.java    |  49 ++
 .../date/BeamSqlExtractExpressionTest.java      | 103 +++++
 .../logical/BeamSqlNotExpressionTest.java       |  47 ++
 .../math/BeamSqlMathBinaryExpressionTest.java   | 201 +++++++++
 .../math/BeamSqlMathUnaryExpressionTest.java    | 309 +++++++++++++
 .../string/BeamSqlCharLengthExpressionTest.java |  44 ++
 .../string/BeamSqlConcatExpressionTest.java     |  66 +++
 .../string/BeamSqlInitCapExpressionTest.java    |  54 +++
 .../string/BeamSqlLowerExpressionTest.java      |  44 ++
 .../string/BeamSqlOverlayExpressionTest.java    |  87 ++++
 .../string/BeamSqlPositionExpressionTest.java   |  84 ++++
 .../BeamSqlStringUnaryExpressionTest.java       |  52 +++
 .../string/BeamSqlSubstringExpressionTest.java  | 101 +++++
 .../string/BeamSqlTrimExpressionTest.java       | 103 +++++
 .../string/BeamSqlUpperExpressionTest.java      |  44 ++
 .../sql/impl/rel/BeamIntersectRelTest.java      | 119 +++++
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    | 204 +++++++++
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  | 241 ++++++++++
 .../BeamJoinRelUnboundedVsUnboundedTest.java    | 219 +++++++++
 .../sql/impl/rel/BeamMinusRelTest.java          | 118 +++++
 .../impl/rel/BeamSetOperatorRelBaseTest.java    | 106 +++++
 .../sql/impl/rel/BeamSortRelTest.java           | 237 ++++++++++
 .../sql/impl/rel/BeamUnionRelTest.java          | 104 +++++
 .../sql/impl/rel/BeamValuesRelTest.java         | 105 +++++
 .../sdk/extensions/sql/impl/rel/CheckSize.java  |  41 ++
 .../sql/interpreter/BeamSqlFnExecutorTest.java  | 416 -----------------
 .../interpreter/BeamSqlFnExecutorTestBase.java  |  92 ----
 .../operator/BeamNullExperssionTest.java        |  55 ---
 .../operator/BeamSqlAndOrExpressionTest.java    |  61 ---
 .../operator/BeamSqlCaseExpressionTest.java     |  93 ----
 .../operator/BeamSqlCastExpressionTest.java     | 125 ------
 .../operator/BeamSqlCompareExpressionTest.java  | 115 -----
 .../operator/BeamSqlInputRefExpressionTest.java |  57 ---
 .../operator/BeamSqlPrimitiveTest.java          |  59 ---
 .../BeamSqlReinterpretExpressionTest.java       |  75 ----
 .../operator/BeamSqlUdfExpressionTest.java      |  51 ---
 .../BeamSqlArithmeticExpressionTest.java        | 237 ----------
 .../date/BeamSqlCurrentDateExpressionTest.java  |  38 --
 .../date/BeamSqlCurrentTimeExpressionTest.java  |  39 --
 .../BeamSqlCurrentTimestampExpressionTest.java  |  39 --
 .../date/BeamSqlDateCeilExpressionTest.java     |  50 ---
 .../date/BeamSqlDateExpressionTestBase.java     |  51 ---
 .../date/BeamSqlDateFloorExpressionTest.java    |  49 --
 .../date/BeamSqlExtractExpressionTest.java      | 103 -----
 .../logical/BeamSqlNotExpressionTest.java       |  47 --
 .../math/BeamSqlMathBinaryExpressionTest.java   | 201 ---------
 .../math/BeamSqlMathUnaryExpressionTest.java    | 309 -------------
 .../string/BeamSqlCharLengthExpressionTest.java |  44 --
 .../string/BeamSqlConcatExpressionTest.java     |  66 ---
 .../string/BeamSqlInitCapExpressionTest.java    |  54 ---
 .../string/BeamSqlLowerExpressionTest.java      |  44 --
 .../string/BeamSqlOverlayExpressionTest.java    |  87 ----
 .../string/BeamSqlPositionExpressionTest.java   |  84 ----
 .../BeamSqlStringUnaryExpressionTest.java       |  52 ---
 .../string/BeamSqlSubstringExpressionTest.java  | 101 -----
 .../string/BeamSqlTrimExpressionTest.java       | 103 -----
 .../string/BeamSqlUpperExpressionTest.java      |  44 --
 .../sql/rel/BeamIntersectRelTest.java           | 119 -----
 .../rel/BeamJoinRelBoundedVsBoundedTest.java    | 204 ---------
 .../rel/BeamJoinRelUnboundedVsBoundedTest.java  | 241 ----------
 .../BeamJoinRelUnboundedVsUnboundedTest.java    | 219 ---------
 .../extensions/sql/rel/BeamMinusRelTest.java    | 118 -----
 .../sql/rel/BeamSetOperatorRelBaseTest.java     | 106 -----
 .../sdk/extensions/sql/rel/BeamSortRelTest.java | 237 ----------
 .../extensions/sql/rel/BeamUnionRelTest.java    | 104 -----
 .../extensions/sql/rel/BeamValuesRelTest.java   | 105 -----
 .../beam/sdk/extensions/sql/rel/CheckSize.java  |  41 --
 .../sql/schema/BeamSqlRowCoderTest.java         |   2 +-
 .../sql/schema/kafka/BeamKafkaCSVTableTest.java |   4 +-
 .../sql/schema/text/BeamTextCSVTableTest.java   |   4 +-
 .../transform/BeamAggregationTransformTest.java |   6 +-
 .../schema/transform/BeamTransformBaseTest.java |   4 +-
 340 files changed, 13117 insertions(+), 13117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index d64ae41..e0d7a78 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
 
 import com.google.auto.value.AutoValue;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.schema.BeamPCollectionTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 714e102..3bea46a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.extensions.sql;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index ca73b13..be0b0af 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -18,12 +18,12 @@
 package org.apache.beam.sdk.extensions.sql;
 
 import java.io.Serializable;
-import org.apache.beam.sdk.extensions.sql.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRowType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
-import org.apache.beam.sdk.extensions.sql.utils.CalciteUtils;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.rel.type.RelDataType;

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
new file mode 100644
index 0000000..1ae6bb3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlExpressionExecutor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+
+/**
+ * {@code BeamSqlExpressionExecutor} fills the gap between relational
+ * expressions in Calcite SQL and executable code.
+ *
+ */
+public interface BeamSqlExpressionExecutor extends Serializable {
+
+  /**
+   * invoked before data processing.
+   */
+  void prepare();
+
+  /**
+   * apply transformation to input record {@link BeamSqlRow}.
+   *
+   */
+  List<Object> execute(BeamSqlRow inputRow);
+
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
new file mode 100644
index 0000000..1f9e0e3
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlCastExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlReinterpretExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlGreaterThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNotNullExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlIsNullExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlLessThanOrEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.comparison.BeamSqlNotEqualsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlCurrentTimestampExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateCeilExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlDateFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.date.BeamSqlExtractExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlAndExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlNotExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.logical.BeamSqlOrExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAbsExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAcosExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAsinExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtan2Expression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlAtanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCeilExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCosExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlCotExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlDegreesExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlExpExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlFloorExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLnExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlLogExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPiExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlPowerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRadiansExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRandIntegerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlRoundExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSignExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlSinExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTanExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}.
+ * {@code BeamSqlFnExecutor} converts a {@link BeamRelNode} to a {@link BeamSqlExpression},
+ * which can be evaluated against the {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlFnExecutor implements BeamSqlExpressionExecutor {
+  protected List<BeamSqlExpression> exps;
+
+  public BeamSqlFnExecutor(BeamRelNode relNode) {
+    this.exps = new ArrayList<>();
+    if (relNode instanceof BeamFilterRel) {
+      BeamFilterRel filterNode = (BeamFilterRel) relNode;
+      RexNode condition = filterNode.getCondition();
+      exps.add(buildExpression(condition));
+    } else if (relNode instanceof BeamProjectRel) {
+      BeamProjectRel projectNode = (BeamProjectRel) relNode;
+      List<RexNode> projects = projectNode.getProjects();
+      for (RexNode rexNode : projects) {
+        exps.add(buildExpression(rexNode));
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", relNode.getClass().toString()));
+    }
+  }
+
+  /**
+   * {@link #buildExpression(RexNode)} visits the operands of {@link RexNode} recursively,
+   * and represent each {@link SqlOperator} with a corresponding {@link BeamSqlExpression}.
+   */
+  static BeamSqlExpression buildExpression(RexNode rexNode) {
+    BeamSqlExpression ret = null;
+    if (rexNode instanceof RexLiteral) {
+      RexLiteral node = (RexLiteral) rexNode;
+      SqlTypeName type = node.getTypeName();
+      Object value = node.getValue();
+
+      if (SqlTypeName.CHAR_TYPES.contains(type)
+          && node.getValue() instanceof NlsString) {
+        // NlsString is not serializable, we need to convert
+        // it to string explicitly.
+        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+        // does this actually make sense?
+        // Calcite actually treat Calendar as the java type of Date Literal
+        return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
+      } else {
+        // node.getType().getSqlTypeName() and node.getSqlTypeName() can be different
+        // e.g. sql: "select 1"
+        // here the literal 1 will be parsed as a RexLiteral where:
+        //     node.getType().getSqlTypeName() = INTEGER (the display type)
+        //     node.getSqlTypeName() = DECIMAL (the actual internal storage format)
+        // So we need to do a convert here.
+        // check RexBuilder#makeLiteral for more information.
+        SqlTypeName realType = node.getType().getSqlTypeName();
+        Object realValue = value;
+        if (type == SqlTypeName.DECIMAL) {
+          BigDecimal rawValue = (BigDecimal) value;
+          switch (realType) {
+            case TINYINT:
+              realValue = (byte) rawValue.intValue();
+              break;
+            case SMALLINT:
+              realValue = (short) rawValue.intValue();
+              break;
+            case INTEGER:
+              realValue = rawValue.intValue();
+              break;
+            case BIGINT:
+              realValue = rawValue.longValue();
+              break;
+            case DECIMAL:
+              realValue = rawValue;
+              break;
+            default:
+              throw new IllegalStateException("type/realType mismatch: "
+                  + type + " VS " + realType);
+          }
+        } else if (type == SqlTypeName.DOUBLE) {
+          Double rawValue = (Double) value;
+          if (realType == SqlTypeName.FLOAT) {
+            realValue = rawValue.floatValue();
+          }
+        }
+        return BeamSqlPrimitive.of(realType, realValue);
+      }
+    } else if (rexNode instanceof RexInputRef) {
+      RexInputRef node = (RexInputRef) rexNode;
+      ret = new BeamSqlInputRefExpression(node.getType().getSqlTypeName(), node.getIndex());
+    } else if (rexNode instanceof RexCall) {
+      RexCall node = (RexCall) rexNode;
+      String opName = node.op.getName();
+      List<BeamSqlExpression> subExps = new ArrayList<>();
+      for (RexNode subNode : node.getOperands()) {
+        subExps.add(buildExpression(subNode));
+      }
+      switch (opName) {
+        // logical operators
+        case "AND":
+          ret = new BeamSqlAndExpression(subExps);
+          break;
+        case "OR":
+          ret = new BeamSqlOrExpression(subExps);
+          break;
+        case "NOT":
+          ret = new BeamSqlNotExpression(subExps);
+          break;
+        case "=":
+          ret = new BeamSqlEqualsExpression(subExps);
+          break;
+        case "<>":
+          ret = new BeamSqlNotEqualsExpression(subExps);
+          break;
+        case ">":
+          ret = new BeamSqlGreaterThanExpression(subExps);
+          break;
+        case ">=":
+          ret = new BeamSqlGreaterThanOrEqualsExpression(subExps);
+          break;
+        case "<":
+          ret = new BeamSqlLessThanExpression(subExps);
+          break;
+        case "<=":
+          ret = new BeamSqlLessThanOrEqualsExpression(subExps);
+          break;
+
+        // arithmetic operators
+        case "+":
+          ret = new BeamSqlPlusExpression(subExps);
+          break;
+        case "-":
+          ret = new BeamSqlMinusExpression(subExps);
+          break;
+        case "*":
+          ret = new BeamSqlMultiplyExpression(subExps);
+          break;
+        case "/":
+        case "/INT":
+          ret = new BeamSqlDivideExpression(subExps);
+          break;
+        case "MOD":
+          ret = new BeamSqlModExpression(subExps);
+          break;
+
+        case "ABS":
+          ret = new BeamSqlAbsExpression(subExps);
+          break;
+        case "ROUND":
+          ret = new BeamSqlRoundExpression(subExps);
+          break;
+        case "LN":
+          ret = new BeamSqlLnExpression(subExps);
+          break;
+        case "LOG10":
+          ret = new BeamSqlLogExpression(subExps);
+          break;
+        case "EXP":
+          ret = new BeamSqlExpExpression(subExps);
+          break;
+        case "ACOS":
+          ret = new BeamSqlAcosExpression(subExps);
+          break;
+        case "ASIN":
+          ret = new BeamSqlAsinExpression(subExps);
+          break;
+        case "ATAN":
+          ret = new BeamSqlAtanExpression(subExps);
+          break;
+        case "COT":
+          ret = new BeamSqlCotExpression(subExps);
+          break;
+        case "DEGREES":
+          ret = new BeamSqlDegreesExpression(subExps);
+          break;
+        case "RADIANS":
+          ret = new BeamSqlRadiansExpression(subExps);
+          break;
+        case "COS":
+          ret = new BeamSqlCosExpression(subExps);
+          break;
+        case "SIN":
+          ret = new BeamSqlSinExpression(subExps);
+          break;
+        case "TAN":
+          ret = new BeamSqlTanExpression(subExps);
+          break;
+        case "SIGN":
+          ret = new BeamSqlSignExpression(subExps);
+          break;
+        case "POWER":
+          ret = new BeamSqlPowerExpression(subExps);
+          break;
+        case "PI":
+          ret = new BeamSqlPiExpression();
+          break;
+        case "ATAN2":
+          ret = new BeamSqlAtan2Expression(subExps);
+          break;
+        case "TRUNCATE":
+          ret = new BeamSqlTruncateExpression(subExps);
+          break;
+        case "RAND":
+          ret = new BeamSqlRandExpression(subExps);
+          break;
+        case "RAND_INTEGER":
+          ret = new BeamSqlRandIntegerExpression(subExps);
+          break;
+
+        // string operators
+        case "||":
+          ret = new BeamSqlConcatExpression(subExps);
+          break;
+        case "POSITION":
+          ret = new BeamSqlPositionExpression(subExps);
+          break;
+        case "CHAR_LENGTH":
+        case "CHARACTER_LENGTH":
+          ret = new BeamSqlCharLengthExpression(subExps);
+          break;
+        case "UPPER":
+          ret = new BeamSqlUpperExpression(subExps);
+          break;
+        case "LOWER":
+          ret = new BeamSqlLowerExpression(subExps);
+          break;
+        case "TRIM":
+          ret = new BeamSqlTrimExpression(subExps);
+          break;
+        case "SUBSTRING":
+          ret = new BeamSqlSubstringExpression(subExps);
+          break;
+        case "OVERLAY":
+          ret = new BeamSqlOverlayExpression(subExps);
+          break;
+        case "INITCAP":
+          ret = new BeamSqlInitCapExpression(subExps);
+          break;
+
+        // date functions
+        case "Reinterpret":
+          return new BeamSqlReinterpretExpression(subExps, node.type.getSqlTypeName());
+        case "CEIL":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlCeilExpression(subExps);
+          } else {
+            return new BeamSqlDateCeilExpression(subExps);
+          }
+        case "FLOOR":
+          if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+            return new BeamSqlFloorExpression(subExps);
+          } else {
+            return new BeamSqlDateFloorExpression(subExps);
+          }
+        case "EXTRACT_DATE":
+        case "EXTRACT":
+          return new BeamSqlExtractExpression(subExps);
+
+        case "LOCALTIME":
+        case "CURRENT_TIME":
+          return new BeamSqlCurrentTimeExpression(subExps);
+
+        case "CURRENT_TIMESTAMP":
+        case "LOCALTIMESTAMP":
+          return new BeamSqlCurrentTimestampExpression(subExps);
+
+        case "CURRENT_DATE":
+          return new BeamSqlCurrentDateExpression();
+
+
+        case "CASE":
+          ret = new BeamSqlCaseExpression(subExps);
+          break;
+        case "CAST":
+          ret = new BeamSqlCastExpression(subExps, node.type.getSqlTypeName());
+          break;
+
+        case "IS NULL":
+          ret = new BeamSqlIsNullExpression(subExps.get(0));
+          break;
+        case "IS NOT NULL":
+          ret = new BeamSqlIsNotNullExpression(subExps.get(0));
+          break;
+
+        case "HOP":
+        case "TUMBLE":
+        case "SESSION":
+          ret = new BeamSqlWindowExpression(subExps, node.type.getSqlTypeName());
+          break;
+        case "HOP_START":
+        case "TUMBLE_START":
+        case "SESSION_START":
+          ret = new BeamSqlWindowStartExpression();
+          break;
+        case "HOP_END":
+        case "TUMBLE_END":
+        case "SESSION_END":
+          ret = new BeamSqlWindowEndExpression();
+          break;
+        default:
+          //handle UDF
+          if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) {
+            SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator();
+            ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction();
+            ret = new BeamSqlUdfExpression(fn.method, subExps,
+              ((RexCall) rexNode).type.getSqlTypeName());
+        } else {
+          throw new UnsupportedOperationException("Operator: " + opName + " is not supported yet!");
+        }
+      }
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not supported yet!", rexNode.getClass().toString()));
+    }
+
+    if (ret != null && !ret.accept()) {
+      throw new IllegalStateException(ret.getClass().getSimpleName()
+          + " does not accept the operands.(" + rexNode + ")");
+    }
+
+    return ret;
+  }
+
+  @Override
+  public void prepare() {
+  }
+
+  @Override
+  public List<Object> execute(BeamSqlRow inputRow) {
+    List<Object> results = new ArrayList<>();
+    for (BeamSqlExpression exp : exps) {
+      results.add(exp.evaluate(inputRow).getValue());
+    }
+    return results;
+  }
+
+  @Override
+  public void close() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
new file mode 100644
index 0000000..61e8aae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCaseExpression.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ *  {@code BeamSqlCaseExpression} represents CASE, NULLIF, COALESCE in SQL.
+ */
+public class BeamSqlCaseExpression extends BeamSqlExpression {
+  public BeamSqlCaseExpression(List<BeamSqlExpression> operands) {
+    // the return type of CASE is the type of the `else` condition
+    super(operands, operands.get(operands.size() - 1).getOutputType());
+  }
+
+  @Override public boolean accept() {
+    // `when`-`then` pair + `else`
+    if (operands.size() % 2 != 1) {
+      return false;
+    }
+
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opType(i) != SqlTypeName.BOOLEAN) {
+        return false;
+      } else if (opType(i + 1) != outputType) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    for (int i = 0; i < operands.size() - 1; i += 2) {
+      if (opValueEvaluated(i, inputRow)) {
+        return BeamSqlPrimitive.of(
+            outputType,
+            opValueEvaluated(i + 1, inputRow)
+        );
+      }
+    }
+    return BeamSqlPrimitive.of(outputType,
+        opValueEvaluated(operands.size() - 1, inputRow));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
new file mode 100644
index 0000000..c98c10d
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlCastExpression.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.runtime.SqlFunctions;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.DateTimeFormatterBuilder;
+import org.joda.time.format.DateTimeParser;
+
+/**
+ * Base class to support 'CAST' operations for all {@link SqlTypeName}.
+ */
+public class BeamSqlCastExpression extends BeamSqlExpression {
+
+  private static final int index = 0;
+  private static final String outputTimestampFormat = "yyyy-MM-dd HH:mm:ss";
+  private static final String outputDateFormat = "yyyy-MM-dd";
+  /**
+   * Date and Timestamp formats used to parse
+   * {@link SqlTypeName#DATE}, {@link SqlTypeName#TIMESTAMP}.
+   */
+  private static final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+      .append(null/*printer*/, new DateTimeParser[] {
+          // date formats
+          DateTimeFormat.forPattern("yy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yy.MM.dd").getParser(),
+          DateTimeFormat.forPattern("yyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyyMMdd").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd").getParser(),
+          DateTimeFormat.forPattern("yyyy/MM/dd").getParser(),
+          DateTimeFormat.forPattern("yyyy.MM.dd").getParser(),
+          // datetime formats
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ssz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss z").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSSz").getParser(),
+          DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS z").getParser() }).toFormatter()
+      .withPivotYear(2020);
+
+  public BeamSqlCastExpression(List<BeamSqlExpression> operands, SqlTypeName castType) {
+    super(operands, castType);
+  }
+
+  @Override
+  public boolean accept() {
+    return numberOfOperands() == 1;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    SqlTypeName castOutputType = getOutputType();
+    switch (castOutputType) {
+      case INTEGER:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.INTEGER, SqlFunctions.toInt(opValueEvaluated(index, inputRow)));
+      case DOUBLE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DOUBLE, SqlFunctions.toDouble(opValueEvaluated(index, inputRow)));
+      case SMALLINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.SMALLINT, SqlFunctions.toShort(opValueEvaluated(index, inputRow)));
+      case TINYINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.TINYINT, SqlFunctions.toByte(opValueEvaluated(index, inputRow)));
+      case BIGINT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.BIGINT, SqlFunctions.toLong(opValueEvaluated(index, inputRow)));
+      case DECIMAL:
+        return BeamSqlPrimitive.of(SqlTypeName.DECIMAL,
+            SqlFunctions.toBigDecimal(opValueEvaluated(index, inputRow)));
+      case FLOAT:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.FLOAT, SqlFunctions.toFloat(opValueEvaluated(index, inputRow)));
+      case CHAR:
+      case VARCHAR:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.VARCHAR, opValueEvaluated(index, inputRow).toString());
+      case DATE:
+        return BeamSqlPrimitive
+            .of(SqlTypeName.DATE, toDate(opValueEvaluated(index, inputRow), outputDateFormat));
+      case TIMESTAMP:
+        return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+            toTimeStamp(opValueEvaluated(index, inputRow), outputTimestampFormat));
+    }
+    throw new UnsupportedOperationException(
+        String.format("Cast to type %s not supported", castOutputType));
+  }
+
+  private Date toDate(Object inputDate, String outputFormat) {
+    try {
+      return Date
+          .valueOf(dateTimeFormatter.parseLocalDate(inputDate.toString()).toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Date'");
+    }
+  }
+
+  private Timestamp toTimeStamp(Object inputTimestamp, String outputFormat) {
+    try {
+      return Timestamp.valueOf(
+          dateTimeFormatter.parseDateTime(inputTimestamp.toString()).secondOfMinute()
+              .roundCeilingCopy().toString(outputFormat));
+    } catch (IllegalArgumentException | UnsupportedOperationException e) {
+      throw new UnsupportedOperationException("Can't be cast to type 'Timestamp'");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
new file mode 100644
index 0000000..dc5db81
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlExpression.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} is an equivalent expression in BeamSQL, of {@link RexNode} in Calcite.
+ *
+ * <p>An implementation of {@link BeamSqlExpression} takes one or more {@code BeamSqlExpression}
+ * as its operands, and return a value with type {@link SqlTypeName}.
+ *
+ */
+public abstract class BeamSqlExpression implements Serializable {
+  protected List<BeamSqlExpression> operands;
+  protected SqlTypeName outputType;
+
+  protected BeamSqlExpression(){}
+
+  public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    this.operands = operands;
+    this.outputType = outputType;
+  }
+
+  public BeamSqlExpression op(int idx) {
+    return operands.get(idx);
+  }
+
+  public SqlTypeName opType(int idx) {
+    return op(idx).getOutputType();
+  }
+
+  public <T> T opValueEvaluated(int idx, BeamSqlRow row) {
+    return (T) op(idx).evaluate(row).getValue();
+  }
+
+  /**
+   * assertion to make sure the input and output are supported in this expression.
+   */
+  public abstract boolean accept();
+
+  /**
+   * Apply input record {@link BeamSqlRow} to this expression,
+   * the output value is wrapped with {@link BeamSqlPrimitive}.
+   */
+  public abstract BeamSqlPrimitive evaluate(BeamSqlRow inputRow);
+
+  public List<BeamSqlExpression> getOperands() {
+    return operands;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public int numberOfOperands() {
+    return operands.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
new file mode 100644
index 0000000..7aba024
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlInputRefExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * An primitive operation for direct field extraction.
+ */
+public class BeamSqlInputRefExpression extends BeamSqlExpression {
+  private int inputRef;
+
+  public BeamSqlInputRefExpression(SqlTypeName sqlTypeName, int inputRef) {
+    super(null, sqlTypeName);
+    this.inputRef = inputRef;
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(outputType, inputRow.getFieldValue(inputRef));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
new file mode 100644
index 0000000..6380af9
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlPrimitive.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+/**
+ * {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
+ * It holds the value, and return it directly during {@link #evaluate(BeamSqlRow)}.
+ *
+ */
+public class BeamSqlPrimitive<T> extends BeamSqlExpression {
+  private T value;
+
+  private BeamSqlPrimitive() {
+  }
+
+  private BeamSqlPrimitive(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  /**
+   * A builder function to create from Type and value directly.
+   */
+  public static <T> BeamSqlPrimitive<T> of(SqlTypeName outputType, T value){
+    BeamSqlPrimitive<T> exp = new BeamSqlPrimitive<>();
+    exp.outputType = outputType;
+    exp.value = value;
+    if (!exp.accept()) {
+      throw new IllegalArgumentException(
+          String.format("value [%s] doesn't match type [%s].", value, outputType));
+    }
+    return exp;
+  }
+
+  public SqlTypeName getOutputType() {
+    return outputType;
+  }
+
+  public T getValue() {
+    return value;
+  }
+
+  public long getLong() {
+    return (Long) getValue();
+  }
+
+  public double getDouble() {
+    return (Double) getValue();
+  }
+
+  public float getFloat() {
+    return (Float) getValue();
+  }
+
+  public int getInteger() {
+    return (Integer) getValue();
+  }
+
+  public short getShort() {
+    return (Short) getValue();
+  }
+
+  public byte getByte() {
+    return (Byte) getValue();
+  }
+  public boolean getBoolean() {
+    return (Boolean) getValue();
+  }
+
+  public String getString() {
+    return (String) getValue();
+  }
+
+  public Date getDate() {
+    return (Date) getValue();
+  }
+
+  public BigDecimal getDecimal() {
+    return (BigDecimal) getValue();
+  }
+
+  @Override
+  public boolean accept() {
+    if (value == null) {
+      return true;
+    }
+
+    switch (outputType) {
+    case BIGINT:
+      return value instanceof Long;
+    case DECIMAL:
+      return value instanceof BigDecimal;
+    case DOUBLE:
+      return value instanceof Double;
+    case FLOAT:
+      return value instanceof Float;
+    case INTEGER:
+      return value instanceof Integer;
+    case SMALLINT:
+      return value instanceof Short;
+    case TINYINT:
+      return value instanceof Byte;
+    case BOOLEAN:
+      return value instanceof Boolean;
+    case CHAR:
+    case VARCHAR:
+      return value instanceof String || value instanceof NlsString;
+    case TIME:
+      return value instanceof GregorianCalendar;
+    case TIMESTAMP:
+    case DATE:
+      return value instanceof Date;
+    case INTERVAL_HOUR:
+      return value instanceof BigDecimal;
+    case INTERVAL_MINUTE:
+      return value instanceof BigDecimal;
+    case SYMBOL:
+      // for SYMBOL, it supports anything...
+      return true;
+    default:
+      throw new UnsupportedOperationException(outputType.name());
+    }
+  }
+
+  @Override
+  public BeamSqlPrimitive<T> evaluate(BeamSqlRow inputRow) {
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..243baaa
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return getOperands().size() == 1
+        && outputType == SqlTypeName.BIGINT
+        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (opType(0) == SqlTypeName.TIME) {
+      GregorianCalendar date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+    } else {
+      Date date = opValueEvaluated(0, inputRow);
+      return BeamSqlPrimitive.of(outputType, date.getTime());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
new file mode 100644
index 0000000..eebb97c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * invoke a UDF function.
+ */
+public class BeamSqlUdfExpression extends BeamSqlExpression {
+  //as Method is not Serializable, need to keep class/method information, and rebuild it.
+  private transient Method method;
+  private String className;
+  private String methodName;
+  private List<String> paraClassName = new ArrayList<>();
+
+  public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps,
+      SqlTypeName sqlTypeName) {
+    super(subExps, sqlTypeName);
+    this.method = method;
+
+    this.className = method.getDeclaringClass().getName();
+    this.methodName = method.getName();
+    for (Class<?> c : method.getParameterTypes()) {
+      paraClassName.add(c.getName());
+    }
+  }
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive evaluate(BeamSqlRow inputRow) {
+    if (method == null) {
+      reConstructMethod();
+    }
+    try {
+      List<Object> paras = new ArrayList<>();
+      for (BeamSqlExpression e : getOperands()) {
+        paras.add(e.evaluate(inputRow).getValue());
+      }
+
+      return BeamSqlPrimitive.of(getOutputType(),
+          method.invoke(null, paras.toArray(new Object[]{})));
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * re-construct method from class/method.
+   */
+  private void reConstructMethod() {
+    try {
+      List<Class<?>> paraClass = new ArrayList<>();
+      for (String pc : paraClassName) {
+        paraClass.add(Class.forName(pc));
+      }
+      method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {}));
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
new file mode 100644
index 0000000..0bd68df
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code SESSION_END} operation.
+ *
+ * <p>These operators returns the <em>end</em> timestamp of window.
+ */
+public class BeamSqlWindowEndExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowEnd().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
new file mode 100644
index 0000000..b560ef8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} operation.
+ *
+ * <p>These functions don't change the timestamp field, instead it's used to indicate
+ * the event_timestamp field, and how the window is defined.
+ */
+public class BeamSqlWindowExpression extends BeamSqlExpression {
+
+  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override
+  public boolean accept() {
+    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
+        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        (Date) operands.get(0).evaluate(inputRow).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
new file mode 100644
index 0000000..e2c1b34
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
+
+import java.util.Date;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START},
+ * {@code SESSION_START} operation.
+ *
+ * <p>These operators returns the <em>start</em> timestamp of window.
+ */
+public class BeamSqlWindowStartExpression extends BeamSqlExpression {
+
+  @Override
+  public boolean accept() {
+    return true;
+  }
+
+  @Override
+  public BeamSqlPrimitive<Date> evaluate(BeamSqlRow inputRow) {
+    return BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP,
+        new Date(inputRow.getWindowStart().getMillis()));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
new file mode 100644
index 0000000..b07b28f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlArithmeticExpression.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.sdk.extensions.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Base class for all arithmetic operators.
+ */
+public abstract class BeamSqlArithmeticExpression extends BeamSqlExpression {
+  private static final List<SqlTypeName> ORDERED_APPROX_TYPES = new ArrayList<>();
+  static {
+    ORDERED_APPROX_TYPES.add(SqlTypeName.TINYINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.SMALLINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.INTEGER);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.BIGINT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.FLOAT);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DOUBLE);
+    ORDERED_APPROX_TYPES.add(SqlTypeName.DECIMAL);
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands) {
+    super(operands, deduceOutputType(operands.get(0).getOutputType(),
+        operands.get(1).getOutputType()));
+  }
+
+  protected BeamSqlArithmeticExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public BeamSqlPrimitive<? extends Number> evaluate(BeamSqlRow inputRow) {
+    BigDecimal left = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(0, inputRow).toString()));
+    BigDecimal right = BigDecimal.valueOf(
+        Double.valueOf(opValueEvaluated(1, inputRow).toString()));
+
+    BigDecimal result = calc(left, right);
+    return getCorrectlyTypedResult(result);
+  }
+
+  protected abstract BigDecimal calc(BigDecimal left, BigDecimal right);
+
+  protected static SqlTypeName deduceOutputType(SqlTypeName left, SqlTypeName right) {
+    int leftIndex = ORDERED_APPROX_TYPES.indexOf(left);
+    int rightIndex = ORDERED_APPROX_TYPES.indexOf(right);
+    if ((left == SqlTypeName.FLOAT || right == SqlTypeName.FLOAT)
+        && (left == SqlTypeName.DECIMAL || right == SqlTypeName.DECIMAL)) {
+      return SqlTypeName.DOUBLE;
+    }
+
+    if (leftIndex < rightIndex) {
+      return right;
+    } else if (leftIndex > rightIndex) {
+      return left;
+    } else {
+      return left;
+    }
+  }
+
+  @Override public boolean accept() {
+    if (operands.size() != 2) {
+      return false;
+    }
+
+    for (BeamSqlExpression operand : operands) {
+      if (!SqlTypeName.NUMERIC_TYPES.contains(operand.getOutputType())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected BeamSqlPrimitive<? extends Number> getCorrectlyTypedResult(BigDecimal rawResult) {
+    Number actualValue;
+    switch (outputType) {
+      case TINYINT:
+        actualValue = rawResult.byteValue();
+        break;
+      case SMALLINT:
+        actualValue = rawResult.shortValue();
+        break;
+      case INTEGER:
+        actualValue = rawResult.intValue();
+        break;
+      case BIGINT:
+        actualValue = rawResult.longValue();
+        break;
+      case FLOAT:
+        actualValue = rawResult.floatValue();
+        break;
+      case DOUBLE:
+        actualValue = rawResult.doubleValue();
+        break;
+      case DECIMAL:
+      default:
+        actualValue = rawResult;
+    }
+    return BeamSqlPrimitive.of(outputType, actualValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/febd044a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
new file mode 100644
index 0000000..d62a3f8
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/arithmetic/BeamSqlDivideExpression.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
+
+/**
+ * '/' operator.
+ */
+public class BeamSqlDivideExpression extends BeamSqlArithmeticExpression {
+  public BeamSqlDivideExpression(List<BeamSqlExpression> operands) {
+    super(operands);
+  }
+
+  @Override protected BigDecimal calc(BigDecimal left, BigDecimal right) {
+    return left.divide(right, 10, RoundingMode.HALF_EVEN);
+  }
+}